Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add functions to count object in use and objects in pool #29

Merged
merged 8 commits into from
Nov 1, 2023
Merged
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name = "ConcurrentUtilities"
uuid = "f0e56b4a-5159-44fe-b623-3e5288b988bb"
authors = ["Jacob Quinn <[email protected]>"]
version = "2.2.1"
version = "2.3.0"

[deps]
Serialization = "9e88b42a-f829-5b0c-bbe9-9e923198166b"
Expand Down
77 changes: 54 additions & 23 deletions src/pools.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,53 +4,84 @@ export Pool, acquire, release, drain!
import Base: acquire, release

"""
Pool{T}(max::Int=4096)
Pool{K, T}(max::Int=4096)
Pool{T}(limit::Int=4096)
Pool{K, T}(limit::Int=4096)

A threadsafe object for managing a pool of objects of type `T`, optionally keyed by objects
of type `K`. Objects can be requested by calling `acquire(f, pool, [key])`, where `f` is a
of type `K`.

Objects can be requested by calling `acquire(f, pool, [key])`, where `f` is a
function that returns a new object of type `T`.
The `key` argument is optional and can be used to lookup objects that match a certain criteria
(a Dict is used internally, so matching is `isequal`).
(a `Dict` is used internally, so matching is `isequal`).

The `max` argument will limit the number of objects
that can be acquired at any given time. If the limit has been reached, `acquire` will
block until an object is returned to the pool via `release`.
The `limit` argument will limit the number of objects that can be in use at any given time.
If the limit has been reached, `acquire` will block until an object is released
via `release`.

By default, `release(pool, obj)` will return the object to the pool for reuse.
`release(pool)` will return the "permit" to the pool while not returning
any object for reuse.
- `release(pool, obj)` will return the object to the pool for reuse.
- `release(pool)` will decrement the number in use but not return any object for reuse.
- `drain!` can be used to remove objects that have been returned to the pool for reuse;
it does *not* release any objects that are in use.

`drain!` can be used to remove any cached objects for reuse, but it does *not* release
any active acquires.
See also `acquire`, `release`, `Pools.limit`, `Pools.in_use`, `Pools.in_pool`, `drain!`.
"""
mutable struct Pool{K, T}
lock::Threads.Condition
max::Int
limit::Int
Comment on lines -29 to +32

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HTTP.jl does a field access of this and now errors in tests (probably the access is only in tests though). Can this rename be reverted? It is very annoying to have some combinations of HTTP and ConcurrentUtils not working together...

cur::Int
keyedvalues::Dict{K, Vector{T}}
values::Vector{T}

function Pool{K, T}(max::Int=4096) where {K, T}
function Pool{K, T}(limit::Int=4096) where {K, T}
T === Nothing && throw(ArgumentError("Pool type can not be `Nothing`"))
x = new(Threads.Condition(), max, 0)
x = new(Threads.Condition(), limit, 0)
if K === Nothing
x.values = T[]
safesizehint!(x.values, max)
safesizehint!(x.values, limit)
else
x.keyedvalues = Dict{K, Vector{T}}()
end
return x
end
end

Pool{T}(max::Int=4096) where {T} = Pool{Nothing, T}(max)
Pool{T}(limit::Int=4096) where {T} = Pool{Nothing, T}(limit)

safesizehint!(x, n) = sizehint!(x, min(4096, n))

# determines whether we'll look up object caches in .keyedvalues or .values
iskeyed(::Pool{K}) where {K} = K !== Nothing

Base.keytype(::Type{<:Pool{K}}) where {K} = K
Base.keytype(p::Pool) = keytype(typeof(p))

Base.valtype(::Type{<:Pool{<:Any, T}}) where {T} = T
Base.valtype(p::Pool) = valtype(typeof(p))
nickrobinson251 marked this conversation as resolved.
Show resolved Hide resolved

"""
Pools.limit(pool::Pool) -> Int

Return the maximum number of objects permitted to be in use at the same time.
See `Pools.in_use(pool)` for the number of objects currently in use.
"""
limit(pool::Pool) = Base.@lock pool.lock pool.limit

"""
Pools.in_use(pool::Pool) -> Int

Return the number of objects currently in use. Less than or equal to `Pools.limit(pool)`.
"""
in_use(pool::Pool) = Base.@lock pool.lock pool.cur

"""
Pools.in_pool(pool::Pool) -> Int

Return the number of objects in the pool available for reuse.
"""
in_pool(pool::Pool) = Base.@lock pool.lock mapreduce(length, +, values(pool.keyedvalues); init=0)
in_pool(pool::Pool{Nothing}) = Base.@lock pool.lock length(pool.values)
Comment on lines +80 to +95
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need the locking here? IIUC, this trades one race condition (do we read the a field of a struct before someone changes is) for another (do we grab the lock first), not sure if this is worth a potential context switch.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't think we want to risk values being mutated whilst computing length, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, this simply reads the count field on the Dict instance, seems safe to me

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... and for arrays I thought checking their length is racy but safe

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems unnecessary to rely on implementation details when we could guarantee safety -- what would be the benefit of not locking (given we expect the computation to take nanoseconds)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to grab the lock could cause the task that is to wait if the lock is already held by someone, this would be somewhat unexpected I'd say for just trying to see what limit is.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, for limit, yeah i can see that (because there's no public API to change limit, in fact we should probably const it)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading from a mutable field in a struct from multiple threads should be locked.

There shouldn't be any risk of deadlock, as long as there's only one lock involved, so I don't see how this introduces a new kind of race condition.

If you're worried about increased contention, you could change these fields to be atomics instead. In practice usually a single global lock on a struct is fine until you can show otherwise with a contention profile. But it could make sense to switch to atomics here if you're worried about that. (The tradeoff is that the atomics add some minor overhead, and if you are mostly mutating/reading these fields when you would have had the lock anyway, then the single lock would've been more performant.)

Finally, I think these fields are mostly only used from our metrics logging in our server, so it doesn't matter if you block on a lock here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading from a mutable field in a struct from multiple threads should be locked.

Well, I'm asking which bad things could happen if you don't lock. Like, could the array length value only be partially updated? That would be a good reason to lock.

I don't see how this introduces a new kind of race condition

I meant that grabbing the lock is "racy" in the sense that sometimes it will be the reader who wins the race and sometimes the mutator (so assuming no "partial" values can be read, it won't be a big improvement).

it doesn't matter if you block on a lock here

I think avoiding work that is not needed is a good principle to follow


"""
drain!(pool)

Expand All @@ -72,7 +103,7 @@ end
TRUE(x) = true

@noinline keyerror(key, K) = throw(ArgumentError("invalid key `$key` provided for pool key type $K"))
@noinline releaseerror() = throw(ArgumentError("cannot release permit when pool is empty"))
@noinline releaseerror() = throw(ArgumentError("cannot release when no objects are in use"))

# NOTE: assumes you have the lock!
function releasepermit(pool::Pool)
Expand All @@ -92,19 +123,19 @@ The `forcenew` keyword argument can be used to force the creation of a new objec
The `isvalid` keyword argument can be used to specify a function that will be called to determine if an object is still valid
for reuse. By default, all objects are considered valid.
If there are no objects available for reuse, `f` will be called to create a new object.
If the pool is already at its maximum capacity, `acquire` will block until an object is returned to the pool via `release`.
If the pool is already at its usage limit, `acquire` will block until an object is returned to the pool via `release`.
"""
function Base.acquire(f, pool::Pool{K, T}, key=nothing; forcenew::Bool=false, isvalid::Function=TRUE) where {K, T}
key isa K || keyerror(key, K)
Base.@lock pool.lock begin
# first get a permit
while pool.cur >= pool.max
while pool.cur >= pool.limit
wait(pool.lock)
end
pool.cur += 1
# now see if we can get an object from the pool for reuse
if !forcenew
objs = iskeyed(pool) ? get!(() -> safesizehint!(T[], pool.max), pool.keyedvalues, key) : pool.values
objs = iskeyed(pool) ? get!(() -> safesizehint!(T[], pool.limit), pool.keyedvalues, key) : pool.values
while !isempty(objs)
obj = pop!(objs)
isvalid(obj) && return obj
Expand All @@ -126,10 +157,10 @@ end
release(pool::Pool{K, T}, obj::T)
release(pool::Pool{K, T})

Return an object to a `pool`, optionally keyed by the provided `key`.
Release an object from usage by a `pool`, optionally keyed by the provided `key`.
If `obj` is provided, it will be returned to the pool for reuse.
Otherwise, if `nothing` is returned, or `release(pool)` is called,
just the "permit" will be returned to the pool.
the usage count will be decremented without an object being returned to the pool for reuse.
"""
function Base.release(pool::Pool{K, T}, key, obj::Union{T, Nothing}=nothing) where {K, T}
key isa K || keyerror(key, K)
Expand Down
99 changes: 94 additions & 5 deletions test/pools.jl
Original file line number Diff line number Diff line change
@@ -1,24 +1,49 @@
using ConcurrentUtilities, Test
using ConcurrentUtilities.Pools, Test

@testset "Pools" begin
pool_size = length∘Pools.values
@testset "nonkeyed and pool basics" begin
pool = Pool{Int}(3)
@test keytype(pool) === Nothing
@test valtype(pool) === Int

@test Pools.limit(pool) == 3
@test Pools.in_use(pool) == 0
@test Pools.in_pool(pool) == 0

# acquire an object from the pool
x1 = acquire(() -> 1, pool)
# no existing objects in the pool, so our function was called to create a new one
@test x1 == 1
@test Pools.limit(pool) == 3
@test Pools.in_use(pool) == 1
@test Pools.in_pool(pool) == 0

# release back to the pool for reuse
release(pool, x1)
@test Pools.in_use(pool) == 0
@test Pools.in_pool(pool) == 1

# acquire another object from the pool
x1 = acquire(() -> 2, pool)
# this time, the pool had an existing object, so our function was not called
@test x1 == 1
@test Pools.in_use(pool) == 1
@test Pools.in_pool(pool) == 0

# but now there are no objects to reuse again, so the next acquire will call our function
x2 = acquire(() -> 2, pool)
@test x2 == 2
@test Pools.in_use(pool) == 2
@test Pools.in_pool(pool) == 0

x3 = acquire(() -> 3, pool)
@test x3 == 3
# the pool is now at capacity, so the next acquire will block until an object is released
@test Pools.in_use(pool) == 3
@test Pools.in_pool(pool) == 0

# the pool is now at `Pools.limit`, so the next acquire will block until an object is released
@test Pools.in_use(pool) == Pools.limit(pool)
tsk = @async acquire(() -> 4, pool; forcenew=true)
yield()
@test !istaskdone(tsk)
Expand All @@ -28,60 +53,110 @@ using ConcurrentUtilities, Test
x1 = fetch(tsk)
# even though we released 1 for reuse, we passed forcenew, so our function was called to create new
@test x1 == 4
@test Pools.in_use(pool) == 3
@test Pools.in_pool(pool) == 1

# error to try and provide a key to a non-keyed pool
@test_throws ArgumentError acquire(() -> 1, pool, 1)

# release objects back to the pool
release(pool, x1)
release(pool, x2)
release(pool, x3)
@test Pools.in_use(pool) == 0
@test Pools.in_pool(pool) == 4

# acquire an object, but checking isvalid
x1 = acquire(() -> 5, pool; isvalid=x -> x == 1)
@test x1 == 1
@test Pools.in_use(pool) == 1

# no valid objects, so our function was called to create a new one
x2 = acquire(() -> 6, pool; isvalid=x -> x == 1)
@test x2 == 6
# we have one slot left in the pool, we now throw while creating new
@test Pools.in_use(pool) == 2

# we have one permit left, we now throw while creating a new object
# and we want to test that the permit isn't permanently lost for the pool
@test_throws ErrorException acquire(() -> error("oops"), pool; forcenew=true)
@test Pools.in_use(pool) == 2

# we can still acquire a new object
x3 = acquire(() -> 7, pool; forcenew=true)
@test x3 == 7
@test Pools.in_use(pool) == 3

# release objects back to the pool
drain!(pool)
release(pool, x1)
release(pool, x2)
release(pool, x3)
@test Pools.in_use(pool) == 0
@test Pools.in_pool(pool) == 3

# try to do an invalid release
@test_throws ArgumentError release(pool, 10)

# test that the invalid release didn't push the object to our pool for reuse
x1 = acquire(() -> 8, pool)
@test x1 == 7
@test Pools.in_use(pool) == 1
@test Pools.in_pool(pool) == 2
# calling drain! removes all objects for reuse
drain!(pool)
@test Pools.in_use(pool) == 1
@test Pools.in_pool(pool) == 0

x2 = acquire(() -> 9, pool)
@test x2 == 9
@test Pools.in_use(pool) == 2
@test Pools.in_pool(pool) == 0
end

@testset "keyed pool" begin
# now test a keyed pool
pool = Pool{String, Int}(3)
@test keytype(pool) === String
@test valtype(pool) === Int

@test Pools.limit(pool) == 3
@test Pools.in_use(pool) == 0
@test Pools.in_pool(pool) == 0

# acquire an object from the pool
x1 = acquire(() -> 1, pool, "a")
# no existing objects in the pool, so our function was called to create a new one
@test x1 == 1
@test Pools.in_use(pool) == 1
@test Pools.in_pool(pool) == 0

# release back to the pool for reuse
release(pool, "a", x1)
@test Pools.in_use(pool) == 0
@test Pools.in_pool(pool) == 1

# test for a different key
x2 = acquire(() -> 2, pool, "b")
# there's an existing object, but for a different key, so we don't reuse
@test x2 == 2
@test Pools.in_use(pool) == 1
@test Pools.in_pool(pool) == 1

# acquire another object from the pool
x1 = acquire(() -> 2, pool, "a")
# this time, the pool had an existing object, so our function was not called
@test x1 == 1
@test Pools.in_use(pool) == 2
@test Pools.in_pool(pool) == 0

x3 = acquire(() -> 3, pool, "a")
@test x3 == 3
@test Pools.in_use(pool) == 3
@test Pools.in_pool(pool) == 0

# the pool is now at capacity, so the next acquire will block until an object is released
# even though we've acquired using different keys, the capacity is shared across the pool
@test Pools.in_use(pool) == Pools.limit(pool)
tsk = @async acquire(() -> 4, pool, "c"; forcenew=true)
yield()
@test !istaskdone(tsk)
Expand All @@ -91,13 +166,27 @@ using ConcurrentUtilities, Test
x1 = fetch(tsk)
# even though we released 1 for reuse, we passed forcenew, so our function was called to create new
@test x1 == 4
@test Pools.in_use(pool) == 3
@test Pools.in_pool(pool) == 1

# error to try and provide an invalid key to a keyed pool
@test_throws ArgumentError acquire(() -> 1, pool, 1)
# error to release an invalid key back to the pool
@test_throws KeyError release(pool, "z", 1)
@test Pools.in_use(pool) == 3
@test Pools.in_pool(pool) == 1

# error to *not* provide a key to a keyed pool
@test_throws ArgumentError acquire(() -> 1, pool)
@test Pools.in_use(pool) == 3
@test Pools.in_pool(pool) == 1

# error to *not* provide a key when releasing to a keyed pool
@test_throws ArgumentError release(pool)
@test Pools.in_use(pool) == 3
@test Pools.in_pool(pool) == 1

# error to release an invalid key back to the pool
@test_throws KeyError release(pool, "z", 1)
@test_broken Pools.in_use(pool) == 3
@test Pools.in_pool(pool) == 1
end
end
Loading