From 34d9b847e0522d53e9af1f534f72ec78eb1aa83e Mon Sep 17 00:00:00 2001 From: Nick Robinson Date: Tue, 3 Oct 2023 13:49:39 +0100 Subject: [PATCH 1/8] Add functions to count object in use and objects in pool --- src/pools.jl | 29 +++++++++++++++- test/pools.jl | 93 ++++++++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 116 insertions(+), 6 deletions(-) diff --git a/src/pools.jl b/src/pools.jl index ee95c39..dd35797 100644 --- a/src/pools.jl +++ b/src/pools.jl @@ -51,6 +51,29 @@ safesizehint!(x, n) = sizehint!(x, min(4096, n)) # determines whether we'll look up object caches in .keyedvalues or .values iskeyed(::Pool{K}) where {K} = K !== Nothing +""" + Pools.max(pool::Pool) -> Int + +Return the maximum number of objects permitted to be in use at the same time. +See `Pools.permits(pool)` for the number of objects currently in use. +""" +max(pool::Pool) = Base.@lock pool.lock pool.max + +""" + Pools.permits(pool::Pool) -> Int + +Return the number of objects currently in use. Less than or equal to `Pools.max(pool)`. +""" +permits(pool::Pool) = Base.@lock pool.lock pool.cur + +""" + Pools.depth(pool::Pool) -> Int + +Return the number of objects in the pool available for reuse. +""" +depth(pool::Pool) = Base.@lock pool.lock mapreduce(length, +, values(pool.keyedvalues); init=0) +depth(pool::Pool{Nothing}) = Base.@lock pool.lock length(pool.values) + """ drain!(pool) @@ -133,13 +156,17 @@ just the "permit" will be returned to the pool. """ function Base.release(pool::Pool{K, T}, key, obj::Union{T, Nothing}=nothing) where {K, T} key isa K || keyerror(key, K) + keyed = iskeyed(pool) Base.@lock pool.lock begin + # if keyed && !haskey(pool.keyedvalues, key) + # throw(Base.KeyError(key)) + # end # return the permit releasepermit(pool) # if we're given an object, we'll put it back in the pool if obj !== nothing # if an invalid key is provided, we let the KeyError propagate - objs = iskeyed(pool) ? pool.keyedvalues[key] : pool.values + objs = keyed ? pool.keyedvalues[key] : pool.values push!(objs, obj) end end diff --git a/test/pools.jl b/test/pools.jl index d2ff678..4fff367 100644 --- a/test/pools.jl +++ b/test/pools.jl @@ -1,24 +1,46 @@ -using ConcurrentUtilities, Test +using ConcurrentUtilities.Pools, Test @testset "Pools" begin + pool_size = length∘Pools.values @testset "nonkeyed and pool basics" begin pool = Pool{Int}(3) + @test Pools.max(pool) == 3 + @test Pools.permits(pool) == 0 + @test Pools.depth(pool) == 0 + # acquire an object from the pool x1 = acquire(() -> 1, pool) # no existing objects in the pool, so our function was called to create a new one @test x1 == 1 + @test Pools.max(pool) == 3 + @test Pools.permits(pool) == 1 + @test Pools.depth(pool) == 0 + # release back to the pool for reuse release(pool, x1) + @test Pools.permits(pool) == 0 + @test Pools.depth(pool) == 1 + # acquire another object from the pool x1 = acquire(() -> 2, pool) # this time, the pool had an existing object, so our function was not called @test x1 == 1 + @test Pools.permits(pool) == 1 + @test Pools.depth(pool) == 0 + # but now there are no objects to reuse again, so the next acquire will call our function x2 = acquire(() -> 2, pool) @test x2 == 2 + @test Pools.permits(pool) == 2 + @test Pools.depth(pool) == 0 + x3 = acquire(() -> 3, pool) @test x3 == 3 - # the pool is now at capacity, so the next acquire will block until an object is released + @test Pools.permits(pool) == 3 + @test Pools.depth(pool) == 0 + + # the pool is now at `Pools.max`, so the next acquire will block until an object is released + @test Pools.permits(pool) == Pools.max(pool) tsk = @async acquire(() -> 4, pool; forcenew=true) yield() @test !istaskdone(tsk) @@ -28,60 +50,107 @@ using ConcurrentUtilities, Test x1 = fetch(tsk) # even though we released 1 for reuse, we passed forcenew, so our function was called to create new @test x1 == 4 + @test Pools.permits(pool) == 3 + @test Pools.depth(pool) == 1 + # error to try and provide a key to a non-keyed pool @test_throws ArgumentError acquire(() -> 1, pool, 1) + # release objects back to the pool release(pool, x1) release(pool, x2) release(pool, x3) + @test Pools.permits(pool) == 0 + @test Pools.depth(pool) == 4 + # acquire an object, but checking isvalid x1 = acquire(() -> 5, pool; isvalid=x -> x == 1) @test x1 == 1 + @test Pools.permits(pool) == 1 + # no valid objects, so our function was called to create a new one x2 = acquire(() -> 6, pool; isvalid=x -> x == 1) @test x2 == 6 - # we have one slot left in the pool, we now throw while creating new + @test Pools.permits(pool) == 2 + + # we have one permit left, we now throw while creating a new object # and we want to test that the permit isn't permanently lost for the pool @test_throws ErrorException acquire(() -> error("oops"), pool; forcenew=true) + @test Pools.permits(pool) == 2 + # we can still acquire a new object x3 = acquire(() -> 7, pool; forcenew=true) @test x3 == 7 + @test Pools.permits(pool) == 3 + # release objects back to the pool + drain!(pool) release(pool, x1) release(pool, x2) release(pool, x3) + @test Pools.permits(pool) == 0 + @test Pools.depth(pool) == 3 + # try to do an invalid release @test_throws ArgumentError release(pool, 10) + # test that the invalid release didn't push the object to our pool for reuse x1 = acquire(() -> 8, pool) @test x1 == 7 + @test Pools.permits(pool) == 1 + @test Pools.depth(pool) == 2 # calling drain! removes all objects for reuse drain!(pool) + @test Pools.permits(pool) == 1 + @test Pools.depth(pool) == 0 + x2 = acquire(() -> 9, pool) @test x2 == 9 + @test Pools.permits(pool) == 2 + @test Pools.depth(pool) == 0 end @testset "keyed pool" begin # now test a keyed pool pool = Pool{String, Int}(3) + @test Pools.max(pool) == 3 + @test Pools.permits(pool) == 0 + @test Pools.depth(pool) == 0 + # acquire an object from the pool x1 = acquire(() -> 1, pool, "a") # no existing objects in the pool, so our function was called to create a new one @test x1 == 1 + @test Pools.permits(pool) == 1 + @test Pools.depth(pool) == 0 + # release back to the pool for reuse release(pool, "a", x1) + @test Pools.permits(pool) == 0 + @test Pools.depth(pool) == 1 + # test for a different key x2 = acquire(() -> 2, pool, "b") # there's an existing object, but for a different key, so we don't reuse @test x2 == 2 + @test Pools.permits(pool) == 1 + @test Pools.depth(pool) == 1 + # acquire another object from the pool x1 = acquire(() -> 2, pool, "a") # this time, the pool had an existing object, so our function was not called @test x1 == 1 + @test Pools.permits(pool) == 2 + @test Pools.depth(pool) == 0 + x3 = acquire(() -> 3, pool, "a") @test x3 == 3 + @test Pools.permits(pool) == 3 + @test Pools.depth(pool) == 0 + # the pool is now at capacity, so the next acquire will block until an object is released # even though we've acquired using different keys, the capacity is shared across the pool + @test Pools.permits(pool) == Pools.max(pool) tsk = @async acquire(() -> 4, pool, "c"; forcenew=true) yield() @test !istaskdone(tsk) @@ -91,13 +160,27 @@ using ConcurrentUtilities, Test x1 = fetch(tsk) # even though we released 1 for reuse, we passed forcenew, so our function was called to create new @test x1 == 4 + @test Pools.permits(pool) == 3 + @test Pools.depth(pool) == 1 + # error to try and provide an invalid key to a keyed pool @test_throws ArgumentError acquire(() -> 1, pool, 1) - # error to release an invalid key back to the pool - @test_throws KeyError release(pool, "z", 1) + @test Pools.permits(pool) == 3 + @test Pools.depth(pool) == 1 + # error to *not* provide a key to a keyed pool @test_throws ArgumentError acquire(() -> 1, pool) + @test Pools.permits(pool) == 3 + @test Pools.depth(pool) == 1 + # error to *not* provide a key when releasing to a keyed pool @test_throws ArgumentError release(pool) + @test Pools.permits(pool) == 3 + @test Pools.depth(pool) == 1 + + # error to release an invalid key back to the pool + @test_throws KeyError release(pool, "z", 1) + @test_broken Pools.permits(pool) == 3 + @test Pools.depth(pool) == 1 end end From 35e940568139f5656057344ddef3eefd237213fc Mon Sep 17 00:00:00 2001 From: Nick Robinson Date: Tue, 3 Oct 2023 18:25:35 +0100 Subject: [PATCH 2/8] Define `keytype` and `valtype` for `Pool` --- src/pools.jl | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/pools.jl b/src/pools.jl index dd35797..11c5c17 100644 --- a/src/pools.jl +++ b/src/pools.jl @@ -51,6 +51,12 @@ safesizehint!(x, n) = sizehint!(x, min(4096, n)) # determines whether we'll look up object caches in .keyedvalues or .values iskeyed(::Pool{K}) where {K} = K !== Nothing +Base.keytype(::Type{<:Pool{K}}) where {K} = K +Base.keytype(p::Pool) = keytype(typeof(p)) + +Base.valtype(::Type{<:Pool{<:Any, T}}) where {T} = T +Base.valtype(p::Pool) = valtype(typeof(p)) + """ Pools.max(pool::Pool) -> Int From 4356d27ce401708cee7c210882d5d64f83baf3fa Mon Sep 17 00:00:00 2001 From: Nick Robinson Date: Tue, 3 Oct 2023 22:55:25 +0100 Subject: [PATCH 3/8] fixup! Define `keytype` and `valtype` for `Pool` --- test/pools.jl | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/test/pools.jl b/test/pools.jl index 4fff367..87dce9c 100644 --- a/test/pools.jl +++ b/test/pools.jl @@ -4,6 +4,9 @@ using ConcurrentUtilities.Pools, Test pool_size = length∘Pools.values @testset "nonkeyed and pool basics" begin pool = Pool{Int}(3) + @test keytype(pool) === Nothing + @test valtype(pool) === Int + @test Pools.max(pool) == 3 @test Pools.permits(pool) == 0 @test Pools.depth(pool) == 0 @@ -113,6 +116,9 @@ using ConcurrentUtilities.Pools, Test @testset "keyed pool" begin # now test a keyed pool pool = Pool{String, Int}(3) + @test keytype(pool) === String + @test valtype(pool) === Int + @test Pools.max(pool) == 3 @test Pools.permits(pool) == 0 @test Pools.depth(pool) == 0 From 4dd764d92c29bcf550fe3772be87632af966180a Mon Sep 17 00:00:00 2001 From: Nick Robinson Date: Tue, 3 Oct 2023 22:56:59 +0100 Subject: [PATCH 4/8] fixup! Add functions to count object in use and objects in pool --- src/pools.jl | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/pools.jl b/src/pools.jl index 11c5c17..70ff3d4 100644 --- a/src/pools.jl +++ b/src/pools.jl @@ -162,17 +162,13 @@ just the "permit" will be returned to the pool. """ function Base.release(pool::Pool{K, T}, key, obj::Union{T, Nothing}=nothing) where {K, T} key isa K || keyerror(key, K) - keyed = iskeyed(pool) Base.@lock pool.lock begin - # if keyed && !haskey(pool.keyedvalues, key) - # throw(Base.KeyError(key)) - # end # return the permit releasepermit(pool) # if we're given an object, we'll put it back in the pool if obj !== nothing # if an invalid key is provided, we let the KeyError propagate - objs = keyed ? pool.keyedvalues[key] : pool.values + objs = iskeyed(pool) ? pool.keyedvalues[key] : pool.values push!(objs, obj) end end From e714c70b721efe5cebeb605fc972416a52f9c663 Mon Sep 17 00:00:00 2001 From: Nick Robinson Date: Wed, 4 Oct 2023 17:40:13 +0100 Subject: [PATCH 5/8] Bump version --- Project.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Project.toml b/Project.toml index 698fcaa..316aa30 100644 --- a/Project.toml +++ b/Project.toml @@ -1,7 +1,7 @@ name = "ConcurrentUtilities" uuid = "f0e56b4a-5159-44fe-b623-3e5288b988bb" authors = ["Jacob Quinn "] -version = "2.2.1" +version = "2.3.0" [deps] Serialization = "9e88b42a-f829-5b0c-bbe9-9e923198166b" From 34f8f1b3316a62e7cccd75b8e35136496a69ef22 Mon Sep 17 00:00:00 2001 From: Nick Robinson Date: Tue, 10 Oct 2023 18:06:18 +0100 Subject: [PATCH 6/8] Rename in terms of "use" --- src/pools.jl | 46 +++++++++++---------- test/pools.jl | 112 +++++++++++++++++++++++++------------------------- 2 files changed, 80 insertions(+), 78 deletions(-) diff --git a/src/pools.jl b/src/pools.jl index 70ff3d4..d5b3730 100644 --- a/src/pools.jl +++ b/src/pools.jl @@ -8,21 +8,23 @@ import Base: acquire, release Pool{K, T}(max::Int=4096) A threadsafe object for managing a pool of objects of type `T`, optionally keyed by objects -of type `K`. Objects can be requested by calling `acquire(f, pool, [key])`, where `f` is a +of type `K`. + +Objects can be requested by calling `acquire(f, pool, [key])`, where `f` is a function that returns a new object of type `T`. The `key` argument is optional and can be used to lookup objects that match a certain criteria -(a Dict is used internally, so matching is `isequal`). +(a `Dict` is used internally, so matching is `isequal`). -The `max` argument will limit the number of objects -that can be acquired at any given time. If the limit has been reached, `acquire` will -block until an object is returned to the pool via `release`. +The `max` argument will limit the number of objects that can be in use at any given time. +If the max usage has been reached, `acquire` will block until an object is released +via `release`. -By default, `release(pool, obj)` will return the object to the pool for reuse. -`release(pool)` will return the "permit" to the pool while not returning -any object for reuse. +- `release(pool, obj)` will return the object to the pool for reuse. +- `release(pool)` will decrement the number in use but not return any object for reuse. +- `drain!` can be used to remove objects that have been returned to the pool for reuse; + it does *not* release any objects that are in use. -`drain!` can be used to remove any cached objects for reuse, but it does *not* release -any active acquires. +See also `acquire`, `release`, `Pools.max_usage`, `Pools.in_use`, `Pools.in_pool`, `drain!`. """ mutable struct Pool{K, T} lock::Threads.Condition @@ -58,27 +60,27 @@ Base.valtype(::Type{<:Pool{<:Any, T}}) where {T} = T Base.valtype(p::Pool) = valtype(typeof(p)) """ - Pools.max(pool::Pool) -> Int + Pools.max_usage(pool::Pool) -> Int Return the maximum number of objects permitted to be in use at the same time. -See `Pools.permits(pool)` for the number of objects currently in use. +See `Pools.in_use(pool)` for the number of objects currently in use. """ -max(pool::Pool) = Base.@lock pool.lock pool.max +max_usage(pool::Pool) = Base.@lock pool.lock pool.max """ - Pools.permits(pool::Pool) -> Int + Pools.in_use(pool::Pool) -> Int -Return the number of objects currently in use. Less than or equal to `Pools.max(pool)`. +Return the number of objects currently in use. Less than or equal to `Pools.max_usage(pool)`. """ -permits(pool::Pool) = Base.@lock pool.lock pool.cur +in_use(pool::Pool) = Base.@lock pool.lock pool.cur """ - Pools.depth(pool::Pool) -> Int + Pools.in_pool(pool::Pool) -> Int Return the number of objects in the pool available for reuse. """ -depth(pool::Pool) = Base.@lock pool.lock mapreduce(length, +, values(pool.keyedvalues); init=0) -depth(pool::Pool{Nothing}) = Base.@lock pool.lock length(pool.values) +in_pool(pool::Pool) = Base.@lock pool.lock mapreduce(length, +, values(pool.keyedvalues); init=0) +in_pool(pool::Pool{Nothing}) = Base.@lock pool.lock length(pool.values) """ drain!(pool) @@ -101,7 +103,7 @@ end TRUE(x) = true @noinline keyerror(key, K) = throw(ArgumentError("invalid key `$key` provided for pool key type $K")) -@noinline releaseerror() = throw(ArgumentError("cannot release permit when pool is empty")) +@noinline releaseerror() = throw(ArgumentError("cannot release when no objects are in use")) # NOTE: assumes you have the lock! function releasepermit(pool::Pool) @@ -155,10 +157,10 @@ end release(pool::Pool{K, T}, obj::T) release(pool::Pool{K, T}) -Return an object to a `pool`, optionally keyed by the provided `key`. +Release an object from usage by a `pool`, optionally keyed by the provided `key`. If `obj` is provided, it will be returned to the pool for reuse. Otherwise, if `nothing` is returned, or `release(pool)` is called, -just the "permit" will be returned to the pool. +the usage count will be decremented without an object being returned to the pool for reuse. """ function Base.release(pool::Pool{K, T}, key, obj::Union{T, Nothing}=nothing) where {K, T} key isa K || keyerror(key, K) diff --git a/test/pools.jl b/test/pools.jl index 87dce9c..fa3bd69 100644 --- a/test/pools.jl +++ b/test/pools.jl @@ -7,43 +7,43 @@ using ConcurrentUtilities.Pools, Test @test keytype(pool) === Nothing @test valtype(pool) === Int - @test Pools.max(pool) == 3 - @test Pools.permits(pool) == 0 - @test Pools.depth(pool) == 0 + @test Pools.max_usage(pool) == 3 + @test Pools.in_use(pool) == 0 + @test Pools.in_pool(pool) == 0 # acquire an object from the pool x1 = acquire(() -> 1, pool) # no existing objects in the pool, so our function was called to create a new one @test x1 == 1 - @test Pools.max(pool) == 3 - @test Pools.permits(pool) == 1 - @test Pools.depth(pool) == 0 + @test Pools.max_usage(pool) == 3 + @test Pools.in_use(pool) == 1 + @test Pools.in_pool(pool) == 0 # release back to the pool for reuse release(pool, x1) - @test Pools.permits(pool) == 0 - @test Pools.depth(pool) == 1 + @test Pools.in_use(pool) == 0 + @test Pools.in_pool(pool) == 1 # acquire another object from the pool x1 = acquire(() -> 2, pool) # this time, the pool had an existing object, so our function was not called @test x1 == 1 - @test Pools.permits(pool) == 1 - @test Pools.depth(pool) == 0 + @test Pools.in_use(pool) == 1 + @test Pools.in_pool(pool) == 0 # but now there are no objects to reuse again, so the next acquire will call our function x2 = acquire(() -> 2, pool) @test x2 == 2 - @test Pools.permits(pool) == 2 - @test Pools.depth(pool) == 0 + @test Pools.in_use(pool) == 2 + @test Pools.in_pool(pool) == 0 x3 = acquire(() -> 3, pool) @test x3 == 3 - @test Pools.permits(pool) == 3 - @test Pools.depth(pool) == 0 + @test Pools.in_use(pool) == 3 + @test Pools.in_pool(pool) == 0 - # the pool is now at `Pools.max`, so the next acquire will block until an object is released - @test Pools.permits(pool) == Pools.max(pool) + # the pool is now at `Pools.max_usage`, so the next acquire will block until an object is released + @test Pools.in_use(pool) == Pools.max_usage(pool) tsk = @async acquire(() -> 4, pool; forcenew=true) yield() @test !istaskdone(tsk) @@ -53,8 +53,8 @@ using ConcurrentUtilities.Pools, Test x1 = fetch(tsk) # even though we released 1 for reuse, we passed forcenew, so our function was called to create new @test x1 == 4 - @test Pools.permits(pool) == 3 - @test Pools.depth(pool) == 1 + @test Pools.in_use(pool) == 3 + @test Pools.in_pool(pool) == 1 # error to try and provide a key to a non-keyed pool @test_throws ArgumentError acquire(() -> 1, pool, 1) @@ -63,36 +63,36 @@ using ConcurrentUtilities.Pools, Test release(pool, x1) release(pool, x2) release(pool, x3) - @test Pools.permits(pool) == 0 - @test Pools.depth(pool) == 4 + @test Pools.in_use(pool) == 0 + @test Pools.in_pool(pool) == 4 # acquire an object, but checking isvalid x1 = acquire(() -> 5, pool; isvalid=x -> x == 1) @test x1 == 1 - @test Pools.permits(pool) == 1 + @test Pools.in_use(pool) == 1 # no valid objects, so our function was called to create a new one x2 = acquire(() -> 6, pool; isvalid=x -> x == 1) @test x2 == 6 - @test Pools.permits(pool) == 2 + @test Pools.in_use(pool) == 2 # we have one permit left, we now throw while creating a new object # and we want to test that the permit isn't permanently lost for the pool @test_throws ErrorException acquire(() -> error("oops"), pool; forcenew=true) - @test Pools.permits(pool) == 2 + @test Pools.in_use(pool) == 2 # we can still acquire a new object x3 = acquire(() -> 7, pool; forcenew=true) @test x3 == 7 - @test Pools.permits(pool) == 3 + @test Pools.in_use(pool) == 3 # release objects back to the pool drain!(pool) release(pool, x1) release(pool, x2) release(pool, x3) - @test Pools.permits(pool) == 0 - @test Pools.depth(pool) == 3 + @test Pools.in_use(pool) == 0 + @test Pools.in_pool(pool) == 3 # try to do an invalid release @test_throws ArgumentError release(pool, 10) @@ -100,17 +100,17 @@ using ConcurrentUtilities.Pools, Test # test that the invalid release didn't push the object to our pool for reuse x1 = acquire(() -> 8, pool) @test x1 == 7 - @test Pools.permits(pool) == 1 - @test Pools.depth(pool) == 2 + @test Pools.in_use(pool) == 1 + @test Pools.in_pool(pool) == 2 # calling drain! removes all objects for reuse drain!(pool) - @test Pools.permits(pool) == 1 - @test Pools.depth(pool) == 0 + @test Pools.in_use(pool) == 1 + @test Pools.in_pool(pool) == 0 x2 = acquire(() -> 9, pool) @test x2 == 9 - @test Pools.permits(pool) == 2 - @test Pools.depth(pool) == 0 + @test Pools.in_use(pool) == 2 + @test Pools.in_pool(pool) == 0 end @testset "keyed pool" begin @@ -119,44 +119,44 @@ using ConcurrentUtilities.Pools, Test @test keytype(pool) === String @test valtype(pool) === Int - @test Pools.max(pool) == 3 - @test Pools.permits(pool) == 0 - @test Pools.depth(pool) == 0 + @test Pools.max_usage(pool) == 3 + @test Pools.in_use(pool) == 0 + @test Pools.in_pool(pool) == 0 # acquire an object from the pool x1 = acquire(() -> 1, pool, "a") # no existing objects in the pool, so our function was called to create a new one @test x1 == 1 - @test Pools.permits(pool) == 1 - @test Pools.depth(pool) == 0 + @test Pools.in_use(pool) == 1 + @test Pools.in_pool(pool) == 0 # release back to the pool for reuse release(pool, "a", x1) - @test Pools.permits(pool) == 0 - @test Pools.depth(pool) == 1 + @test Pools.in_use(pool) == 0 + @test Pools.in_pool(pool) == 1 # test for a different key x2 = acquire(() -> 2, pool, "b") # there's an existing object, but for a different key, so we don't reuse @test x2 == 2 - @test Pools.permits(pool) == 1 - @test Pools.depth(pool) == 1 + @test Pools.in_use(pool) == 1 + @test Pools.in_pool(pool) == 1 # acquire another object from the pool x1 = acquire(() -> 2, pool, "a") # this time, the pool had an existing object, so our function was not called @test x1 == 1 - @test Pools.permits(pool) == 2 - @test Pools.depth(pool) == 0 + @test Pools.in_use(pool) == 2 + @test Pools.in_pool(pool) == 0 x3 = acquire(() -> 3, pool, "a") @test x3 == 3 - @test Pools.permits(pool) == 3 - @test Pools.depth(pool) == 0 + @test Pools.in_use(pool) == 3 + @test Pools.in_pool(pool) == 0 # the pool is now at capacity, so the next acquire will block until an object is released # even though we've acquired using different keys, the capacity is shared across the pool - @test Pools.permits(pool) == Pools.max(pool) + @test Pools.in_use(pool) == Pools.max_usage(pool) tsk = @async acquire(() -> 4, pool, "c"; forcenew=true) yield() @test !istaskdone(tsk) @@ -166,27 +166,27 @@ using ConcurrentUtilities.Pools, Test x1 = fetch(tsk) # even though we released 1 for reuse, we passed forcenew, so our function was called to create new @test x1 == 4 - @test Pools.permits(pool) == 3 - @test Pools.depth(pool) == 1 + @test Pools.in_use(pool) == 3 + @test Pools.in_pool(pool) == 1 # error to try and provide an invalid key to a keyed pool @test_throws ArgumentError acquire(() -> 1, pool, 1) - @test Pools.permits(pool) == 3 - @test Pools.depth(pool) == 1 + @test Pools.in_use(pool) == 3 + @test Pools.in_pool(pool) == 1 # error to *not* provide a key to a keyed pool @test_throws ArgumentError acquire(() -> 1, pool) - @test Pools.permits(pool) == 3 - @test Pools.depth(pool) == 1 + @test Pools.in_use(pool) == 3 + @test Pools.in_pool(pool) == 1 # error to *not* provide a key when releasing to a keyed pool @test_throws ArgumentError release(pool) - @test Pools.permits(pool) == 3 - @test Pools.depth(pool) == 1 + @test Pools.in_use(pool) == 3 + @test Pools.in_pool(pool) == 1 # error to release an invalid key back to the pool @test_throws KeyError release(pool, "z", 1) - @test_broken Pools.permits(pool) == 3 - @test Pools.depth(pool) == 1 + @test_broken Pools.in_use(pool) == 3 + @test Pools.in_pool(pool) == 1 end end From ad2f4b009c2a52ae4d624b201dc9c495a8dd8ffc Mon Sep 17 00:00:00 2001 From: Nick Robinson Date: Tue, 10 Oct 2023 18:50:32 +0100 Subject: [PATCH 7/8] Rename `max` -> `limit` --- src/pools.jl | 32 ++++++++++++++++---------------- test/pools.jl | 12 ++++++------ 2 files changed, 22 insertions(+), 22 deletions(-) diff --git a/src/pools.jl b/src/pools.jl index d5b3730..80b712a 100644 --- a/src/pools.jl +++ b/src/pools.jl @@ -4,8 +4,8 @@ export Pool, acquire, release, drain! import Base: acquire, release """ - Pool{T}(max::Int=4096) - Pool{K, T}(max::Int=4096) + Pool{T}(limit::Int=4096) + Pool{K, T}(limit::Int=4096) A threadsafe object for managing a pool of objects of type `T`, optionally keyed by objects of type `K`. @@ -15,8 +15,8 @@ function that returns a new object of type `T`. The `key` argument is optional and can be used to lookup objects that match a certain criteria (a `Dict` is used internally, so matching is `isequal`). -The `max` argument will limit the number of objects that can be in use at any given time. -If the max usage has been reached, `acquire` will block until an object is released +The `limit` argument will limit the number of objects that can be in use at any given time. +If the limit has been reached, `acquire` will block until an object is released via `release`. - `release(pool, obj)` will return the object to the pool for reuse. @@ -24,21 +24,21 @@ via `release`. - `drain!` can be used to remove objects that have been returned to the pool for reuse; it does *not* release any objects that are in use. -See also `acquire`, `release`, `Pools.max_usage`, `Pools.in_use`, `Pools.in_pool`, `drain!`. +See also `acquire`, `release`, `Pools.limit`, `Pools.in_use`, `Pools.in_pool`, `drain!`. """ mutable struct Pool{K, T} lock::Threads.Condition - max::Int + limit::Int cur::Int keyedvalues::Dict{K, Vector{T}} values::Vector{T} - function Pool{K, T}(max::Int=4096) where {K, T} + function Pool{K, T}(limit::Int=4096) where {K, T} T === Nothing && throw(ArgumentError("Pool type can not be `Nothing`")) - x = new(Threads.Condition(), max, 0) + x = new(Threads.Condition(), limit, 0) if K === Nothing x.values = T[] - safesizehint!(x.values, max) + safesizehint!(x.values, limit) else x.keyedvalues = Dict{K, Vector{T}}() end @@ -46,7 +46,7 @@ mutable struct Pool{K, T} end end -Pool{T}(max::Int=4096) where {T} = Pool{Nothing, T}(max) +Pool{T}(limit::Int=4096) where {T} = Pool{Nothing, T}(limit) safesizehint!(x, n) = sizehint!(x, min(4096, n)) @@ -60,17 +60,17 @@ Base.valtype(::Type{<:Pool{<:Any, T}}) where {T} = T Base.valtype(p::Pool) = valtype(typeof(p)) """ - Pools.max_usage(pool::Pool) -> Int + Pools.limit(pool::Pool) -> Int Return the maximum number of objects permitted to be in use at the same time. See `Pools.in_use(pool)` for the number of objects currently in use. """ -max_usage(pool::Pool) = Base.@lock pool.lock pool.max +limit(pool::Pool) = Base.@lock pool.lock pool.limit """ Pools.in_use(pool::Pool) -> Int -Return the number of objects currently in use. Less than or equal to `Pools.max_usage(pool)`. +Return the number of objects currently in use. Less than or equal to `Pools.limit(pool)`. """ in_use(pool::Pool) = Base.@lock pool.lock pool.cur @@ -123,19 +123,19 @@ The `forcenew` keyword argument can be used to force the creation of a new objec The `isvalid` keyword argument can be used to specify a function that will be called to determine if an object is still valid for reuse. By default, all objects are considered valid. If there are no objects available for reuse, `f` will be called to create a new object. -If the pool is already at its maximum capacity, `acquire` will block until an object is returned to the pool via `release`. +If the pool is already at its usage limit, `acquire` will block until an object is returned to the pool via `release`. """ function Base.acquire(f, pool::Pool{K, T}, key=nothing; forcenew::Bool=false, isvalid::Function=TRUE) where {K, T} key isa K || keyerror(key, K) Base.@lock pool.lock begin # first get a permit - while pool.cur >= pool.max + while pool.cur >= pool.limit wait(pool.lock) end pool.cur += 1 # now see if we can get an object from the pool for reuse if !forcenew - objs = iskeyed(pool) ? get!(() -> safesizehint!(T[], pool.max), pool.keyedvalues, key) : pool.values + objs = iskeyed(pool) ? get!(() -> safesizehint!(T[], pool.limit), pool.keyedvalues, key) : pool.values while !isempty(objs) obj = pop!(objs) isvalid(obj) && return obj diff --git a/test/pools.jl b/test/pools.jl index fa3bd69..bd98521 100644 --- a/test/pools.jl +++ b/test/pools.jl @@ -7,7 +7,7 @@ using ConcurrentUtilities.Pools, Test @test keytype(pool) === Nothing @test valtype(pool) === Int - @test Pools.max_usage(pool) == 3 + @test Pools.limit(pool) == 3 @test Pools.in_use(pool) == 0 @test Pools.in_pool(pool) == 0 @@ -15,7 +15,7 @@ using ConcurrentUtilities.Pools, Test x1 = acquire(() -> 1, pool) # no existing objects in the pool, so our function was called to create a new one @test x1 == 1 - @test Pools.max_usage(pool) == 3 + @test Pools.limit(pool) == 3 @test Pools.in_use(pool) == 1 @test Pools.in_pool(pool) == 0 @@ -42,8 +42,8 @@ using ConcurrentUtilities.Pools, Test @test Pools.in_use(pool) == 3 @test Pools.in_pool(pool) == 0 - # the pool is now at `Pools.max_usage`, so the next acquire will block until an object is released - @test Pools.in_use(pool) == Pools.max_usage(pool) + # the pool is now at `Pools.limit`, so the next acquire will block until an object is released + @test Pools.in_use(pool) == Pools.limit(pool) tsk = @async acquire(() -> 4, pool; forcenew=true) yield() @test !istaskdone(tsk) @@ -119,7 +119,7 @@ using ConcurrentUtilities.Pools, Test @test keytype(pool) === String @test valtype(pool) === Int - @test Pools.max_usage(pool) == 3 + @test Pools.limit(pool) == 3 @test Pools.in_use(pool) == 0 @test Pools.in_pool(pool) == 0 @@ -156,7 +156,7 @@ using ConcurrentUtilities.Pools, Test # the pool is now at capacity, so the next acquire will block until an object is released # even though we've acquired using different keys, the capacity is shared across the pool - @test Pools.in_use(pool) == Pools.max_usage(pool) + @test Pools.in_use(pool) == Pools.limit(pool) tsk = @async acquire(() -> 4, pool, "c"; forcenew=true) yield() @test !istaskdone(tsk) From adf4fa05917339fc8d42296c2c410c42786b07d6 Mon Sep 17 00:00:00 2001 From: Nick Robinson Date: Wed, 1 Nov 2023 14:41:19 +0000 Subject: [PATCH 8/8] Docs for keytype/valtype --- src/pools.jl | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/pools.jl b/src/pools.jl index 80b712a..a4c0af9 100644 --- a/src/pools.jl +++ b/src/pools.jl @@ -25,6 +25,7 @@ via `release`. it does *not* release any objects that are in use. See also `acquire`, `release`, `Pools.limit`, `Pools.in_use`, `Pools.in_pool`, `drain!`. +The key and object types can be inspected with `keytype` and `valtype` respectively. """ mutable struct Pool{K, T} lock::Threads.Condition @@ -53,9 +54,20 @@ safesizehint!(x, n) = sizehint!(x, min(4096, n)) # determines whether we'll look up object caches in .keyedvalues or .values iskeyed(::Pool{K}) where {K} = K !== Nothing +""" + keytype(::Pool) + +Return the type of the keys for the pool. +If the pool is not keyed, this will return `Nothing`. +""" Base.keytype(::Type{<:Pool{K}}) where {K} = K Base.keytype(p::Pool) = keytype(typeof(p)) +""" + valtype(::Pool) + +Return the type of the objects that can be stored in the pool. +""" Base.valtype(::Type{<:Pool{<:Any, T}}) where {T} = T Base.valtype(p::Pool) = valtype(typeof(p))