From c032e400c3d4cac7d7261caa17a32907fdfe8056 Mon Sep 17 00:00:00 2001 From: anicusan Date: Wed, 26 Jun 2024 12:11:55 +0100 Subject: [PATCH] using copyto and function barrier in foreachindex --- src/foreachindex.jl | 37 +++++++++++++++++++++---------- src/sort/merge_sort.jl | 2 +- src/sort/merge_sort_by_key.jl | 4 ++-- src/sort/merge_sortperm.jl | 2 +- src/task_partitioner.jl | 41 ++++++++++++++++++++++------------- 5 files changed, 55 insertions(+), 31 deletions(-) diff --git a/src/foreachindex.jl b/src/foreachindex.jl index 31900ac..7a53311 100644 --- a/src/foreachindex.jl +++ b/src/foreachindex.jl @@ -20,7 +20,28 @@ function foreachindex( end -function foreachindex( +function _foreachindex_polyester(f, itr, min_elems) + @batch minbatch=min_elems per=thread for i in eachindex(itr) + @inline f(i) + end +end + + +function _foreachindex_threads(f, itr, max_tasks, min_elems) + task_partition(length(itr), max_tasks, min_elems) do irange + # Task partition returns static ranges indexed from 1:length(itr); use those to index into + # eachindex, which supports arbitrary indices (and gets compiled away when using 1-based + # collections); each thread processes this range + itr_indices = eachindex(itr) + for i in irange + @inbounds itr_index = itr_indices[i] + @inline f(itr_index) + end + end +end + + +@inline function foreachindex( f, itr, backend::CPU; @@ -31,17 +52,9 @@ function foreachindex( ) # CPU implementation if scheduler === :threads - task_partition(length(itr), max_tasks, min_elems) do irange - itr_indices = eachindex(itr) - for i in irange - @inbounds itr_index = itr_indices[i] - @inline f(itr_index) - end - end + _foreachindex_threads(f, itr, max_tasks, min_elems) elseif scheduler === :polyester - @batch minbatch=min_elems per=thread for i in eachindex(itr) - @inline f(i) - end + _foreachindex_polyester(f, itr, min_elems) else throw(ArgumentError("`scheduler` must be `:threads` or `:polyester`. Received $scheduler")) end @@ -52,7 +65,7 @@ end """ foreachindex(f, itr, [backend::GPU]; block_size::Int=256) - foreachindex(f, itr, [backend::CPU]; scheduler=:polyester, max_tasks=Threads.nthreads(), min_elems=1) + foreachindex(f, itr, [backend::CPU]; scheduler=:threads, max_tasks=Threads.nthreads(), min_elems=1) foreachindex(f, itr, backend=get_backend(itr); kwargs...) Parallelised `for` loop over the indices of an iterable. diff --git a/src/sort/merge_sort.jl b/src/sort/merge_sort.jl index 3eb011c..3a39119 100644 --- a/src/sort/merge_sort.jl +++ b/src/sort/merge_sort.jl @@ -172,7 +172,7 @@ function merge_sort!( end if isodd(niter) - v .= p1 + copyto!(v, p1) end end diff --git a/src/sort/merge_sort_by_key.jl b/src/sort/merge_sort_by_key.jl index 47518f0..426dabf 100644 --- a/src/sort/merge_sort_by_key.jl +++ b/src/sort/merge_sort_by_key.jl @@ -212,8 +212,8 @@ function merge_sort_by_key!( end if isodd(niter) - keys .= pk1 - values .= pv1 + copyto!(keys, pk1) + copyto!(values, pv1) end end diff --git a/src/sort/merge_sortperm.jl b/src/sort/merge_sortperm.jl index 8f9b3a1..b655ae1 100644 --- a/src/sort/merge_sortperm.jl +++ b/src/sort/merge_sortperm.jl @@ -120,7 +120,7 @@ function merge_sortperm_lowmem!( end if isodd(niter) - ix .= p1 + copyto!(ix, p1) end end diff --git a/src/task_partitioner.jl b/src/task_partitioner.jl index b2a1a0d..0cb4c62 100644 --- a/src/task_partitioner.jl +++ b/src/task_partitioner.jl @@ -147,25 +147,36 @@ end ``` """ function task_partition(f, num_elems, max_tasks=Threads.nthreads(), min_elems=1) - tp = TaskPartitioner(num_elems, max_tasks, min_elems) - if tp.num_tasks == 1 - f(1:num_elems) + num_elems >= 0 || throw(ArgumentError("num_elems must be >= 0")) + max_tasks > 0 || throw(ArgumentError("max_tasks must be > 0")) + min_elems > 0 || throw(ArgumentError("min_elems must be > 0")) + + if min(max_tasks, num_elems รท min_elems) <= 1 + @inline f(1:num_elems) else - tasks = Vector{Task}(undef, tp.num_tasks - 1) + # Compiler should decide if this should be inlined; threading adds quite a bit of code, it + # is faster (as seen in Cthulhu) to keep it in a separate self-contained function + _task_partition_threads(f, num_elems, max_tasks, min_elems) + end + nothing +end - # Launch first N - 1 tasks - @inbounds for i in 1:tp.num_tasks - 1 - tasks[i] = Threads.@spawn f(tp[i]) - end - # Execute task N on this main thread - @inbounds f(tp[tp.num_tasks]) +function _task_partition_threads(f, num_elems, max_tasks, min_elems) + tp = TaskPartitioner(num_elems, max_tasks, min_elems) + tasks = Vector{Task}(undef, tp.num_tasks - 1) - # Wait for the tasks to finish - @inbounds for i in 1:tp.num_tasks - 1 - wait(tasks[i]) - end + # Launch first N - 1 tasks + @inbounds for i in 1:tp.num_tasks - 1 + tasks[i] = Threads.@spawn f(tp[i]) + end + + # Execute task N on this main thread + @inbounds f(tp[tp.num_tasks]) + + # Wait for the tasks to finish + @inbounds for i in 1:tp.num_tasks - 1 + wait(tasks[i]) end - nothing end