Skip to content

Commit

Permalink
using copyto and function barrier in foreachindex
Browse files Browse the repository at this point in the history
  • Loading branch information
anicusan committed Jun 26, 2024
1 parent 7780ff3 commit c032e40
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 31 deletions.
37 changes: 25 additions & 12 deletions src/foreachindex.jl
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,28 @@ function foreachindex(
end


function foreachindex(
function _foreachindex_polyester(f, itr, min_elems)
@batch minbatch=min_elems per=thread for i in eachindex(itr)
@inline f(i)
end
end


function _foreachindex_threads(f, itr, max_tasks, min_elems)
task_partition(length(itr), max_tasks, min_elems) do irange
# Task partition returns static ranges indexed from 1:length(itr); use those to index into
# eachindex, which supports arbitrary indices (and gets compiled away when using 1-based
# collections); each thread processes this range
itr_indices = eachindex(itr)
for i in irange
@inbounds itr_index = itr_indices[i]
@inline f(itr_index)
end
end
end


@inline function foreachindex(
f,
itr,
backend::CPU;
Expand All @@ -31,17 +52,9 @@ function foreachindex(
)
# CPU implementation
if scheduler === :threads
task_partition(length(itr), max_tasks, min_elems) do irange
itr_indices = eachindex(itr)
for i in irange
@inbounds itr_index = itr_indices[i]
@inline f(itr_index)
end
end
_foreachindex_threads(f, itr, max_tasks, min_elems)
elseif scheduler === :polyester
@batch minbatch=min_elems per=thread for i in eachindex(itr)
@inline f(i)
end
_foreachindex_polyester(f, itr, min_elems)
else
throw(ArgumentError("`scheduler` must be `:threads` or `:polyester`. Received $scheduler"))
end
Expand All @@ -52,7 +65,7 @@ end

"""
foreachindex(f, itr, [backend::GPU]; block_size::Int=256)
foreachindex(f, itr, [backend::CPU]; scheduler=:polyester, max_tasks=Threads.nthreads(), min_elems=1)
foreachindex(f, itr, [backend::CPU]; scheduler=:threads, max_tasks=Threads.nthreads(), min_elems=1)
foreachindex(f, itr, backend=get_backend(itr); kwargs...)
Parallelised `for` loop over the indices of an iterable.
Expand Down
2 changes: 1 addition & 1 deletion src/sort/merge_sort.jl
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ function merge_sort!(
end

if isodd(niter)
v .= p1
copyto!(v, p1)
end
end

Expand Down
4 changes: 2 additions & 2 deletions src/sort/merge_sort_by_key.jl
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,8 @@ function merge_sort_by_key!(
end

if isodd(niter)
keys .= pk1
values .= pv1
copyto!(keys, pk1)
copyto!(values, pv1)
end
end

Expand Down
2 changes: 1 addition & 1 deletion src/sort/merge_sortperm.jl
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ function merge_sortperm_lowmem!(
end

if isodd(niter)
ix .= p1
copyto!(ix, p1)
end
end

Expand Down
41 changes: 26 additions & 15 deletions src/task_partitioner.jl
Original file line number Diff line number Diff line change
Expand Up @@ -147,25 +147,36 @@ end
```
"""
function task_partition(f, num_elems, max_tasks=Threads.nthreads(), min_elems=1)
tp = TaskPartitioner(num_elems, max_tasks, min_elems)
if tp.num_tasks == 1
f(1:num_elems)
num_elems >= 0 || throw(ArgumentError("num_elems must be >= 0"))
max_tasks > 0 || throw(ArgumentError("max_tasks must be > 0"))
min_elems > 0 || throw(ArgumentError("min_elems must be > 0"))

if min(max_tasks, num_elems ÷ min_elems) <= 1
@inline f(1:num_elems)
else
tasks = Vector{Task}(undef, tp.num_tasks - 1)
# Compiler should decide if this should be inlined; threading adds quite a bit of code, it
# is faster (as seen in Cthulhu) to keep it in a separate self-contained function
_task_partition_threads(f, num_elems, max_tasks, min_elems)
end
nothing
end

# Launch first N - 1 tasks
@inbounds for i in 1:tp.num_tasks - 1
tasks[i] = Threads.@spawn f(tp[i])
end

# Execute task N on this main thread
@inbounds f(tp[tp.num_tasks])
function _task_partition_threads(f, num_elems, max_tasks, min_elems)
tp = TaskPartitioner(num_elems, max_tasks, min_elems)
tasks = Vector{Task}(undef, tp.num_tasks - 1)

# Wait for the tasks to finish
@inbounds for i in 1:tp.num_tasks - 1
wait(tasks[i])
end
# Launch first N - 1 tasks
@inbounds for i in 1:tp.num_tasks - 1
tasks[i] = Threads.@spawn f(tp[i])
end

# Execute task N on this main thread
@inbounds f(tp[tp.num_tasks])

# Wait for the tasks to finish
@inbounds for i in 1:tp.num_tasks - 1
wait(tasks[i])
end
nothing
end

0 comments on commit c032e40

Please sign in to comment.