Skip to content

Commit

Permalink
Improve scalability for parallel_for_each feeder
Browse files Browse the repository at this point in the history
Signed-off-by: pavelkumbrasev <[email protected]>
  • Loading branch information
pavelkumbrasev committed Mar 29, 2024
1 parent 78982bd commit 6fc04d1
Showing 1 changed file with 22 additions and 20 deletions.
42 changes: 22 additions & 20 deletions include/oneapi/tbb/parallel_for_each.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,17 @@ struct feeder_item_task: public task {
using feeder_type = feeder_impl<Body, Item>;

template <typename ItemType>
feeder_item_task(ItemType&& input_item, feeder_type& feeder, small_object_allocator& alloc) :
feeder_item_task(ItemType&& input_item, feeder_type& feeder, small_object_allocator& alloc, wait_tree_node_interface* node) :
item(std::forward<ItemType>(input_item)),
my_feeder(feeder),
my_allocator(alloc)
{}
my_allocator(alloc),
m_wait_tree_node(node)
{
m_wait_tree_node->reserve();
}

void finalize(const execution_data& ed) {
my_feeder.my_wait_context.release();
m_wait_tree_node->release();
my_allocator.delete_object(this, ed);
}

Expand Down Expand Up @@ -160,6 +163,7 @@ struct feeder_item_task: public task {
Item item;
feeder_type& my_feeder;
small_object_allocator my_allocator;
wait_tree_node_interface* m_wait_tree_node;
}; // class feeder_item_task

/** Implements new task adding procedure.
Expand All @@ -170,9 +174,8 @@ class feeder_impl : public feeder<Item> {
void internal_add_copy_impl(std::true_type, const Item& item) {
using feeder_task = feeder_item_task<Body, Item>;
small_object_allocator alloc;
auto task = alloc.new_object<feeder_task>(item, *this, alloc);
auto task = alloc.new_object<feeder_task>(item, *this, alloc, r1::get_thread_reference_node(&my_wait_context));

my_wait_context.reserve();
spawn(*task, my_execution_context);
}

Expand All @@ -187,20 +190,19 @@ class feeder_impl : public feeder<Item> {
void internal_add_move(Item&& item) override {
using feeder_task = feeder_item_task<Body, Item>;
small_object_allocator alloc{};
auto task = alloc.new_object<feeder_task>(std::move(item), *this, alloc);
auto task = alloc.new_object<feeder_task>(std::move(item), *this, alloc, r1::get_thread_reference_node(&my_wait_context));

my_wait_context.reserve();
spawn(*task, my_execution_context);
}
public:
feeder_impl(const Body& body, wait_context& w_context, task_group_context &context)
feeder_impl(const Body& body, wait_context_node& w_context, task_group_context &context)
: my_body(body),
my_wait_context(w_context)
, my_execution_context(context)
{}

const Body& my_body;
wait_context& my_wait_context;
wait_context_node& my_wait_context;
task_group_context& my_execution_context;
}; // class feeder_impl

Expand Down Expand Up @@ -263,7 +265,7 @@ struct input_block_handling_task : public task {
using iteration_task_iterator_type = typename input_iteration_task_iterator_helper<Body, Item>::type;
using iteration_task = for_each_iteration_task<iteration_task_iterator_type, Body, Item>;

input_block_handling_task(wait_context& root_wait_context, task_group_context& e_context,
input_block_handling_task(wait_context_node& root_wait_context, task_group_context& e_context,
const Body& body, feeder_impl<Body, Item>* feeder_ptr, small_object_allocator& alloc)
:my_size(0), my_wait_context(0), my_root_wait_context(root_wait_context),
my_execution_context(e_context), my_allocator(alloc)
Expand Down Expand Up @@ -312,7 +314,7 @@ struct input_block_handling_task : public task {
aligned_space<iteration_task, max_block_size> task_pool;
std::size_t my_size;
wait_context my_wait_context;
wait_context& my_root_wait_context;
wait_context_node& my_root_wait_context;
task_group_context& my_execution_context;
small_object_allocator my_allocator;
}; // class input_block_handling_task
Expand All @@ -326,7 +328,7 @@ struct forward_block_handling_task : public task {
using iteration_task = for_each_iteration_task<Iterator, Body, Item>;

forward_block_handling_task(Iterator first, std::size_t size,
wait_context& w_context, task_group_context& e_context,
wait_context_node& w_context, task_group_context& e_context,
const Body& body, feeder_impl<Body, Item>* feeder_ptr,
small_object_allocator& alloc)
: my_size(size), my_wait_context(0), my_root_wait_context(w_context),
Expand Down Expand Up @@ -373,7 +375,7 @@ struct forward_block_handling_task : public task {
aligned_space<iteration_task, max_block_size> task_pool;
std::size_t my_size;
wait_context my_wait_context;
wait_context& my_root_wait_context;
wait_context_node& my_root_wait_context;
task_group_context& my_execution_context;
small_object_allocator my_allocator;
}; // class forward_block_handling_task
Expand Down Expand Up @@ -456,15 +458,15 @@ using feeder_is_required = tbb::detail::void_t<decltype(tbb::detail::invoke(std:
// Creates feeder object only if the body can accept it
template <typename Iterator, typename Body, typename Item, typename = void>
struct feeder_holder {
feeder_holder( wait_context&, task_group_context&, const Body& ) {}
feeder_holder( wait_context_node&, task_group_context&, const Body& ) {}

feeder_impl<Body, Item>* feeder_ptr() { return nullptr; }
}; // class feeder_holder

template <typename Iterator, typename Body, typename Item>
class feeder_holder<Iterator, Body, Item, feeder_is_required<Body, Iterator, Item>> {
public:
feeder_holder( wait_context& w_context, task_group_context& context, const Body& body )
feeder_holder( wait_context_node& w_context, task_group_context& context, const Body& body )
: my_feeder(body, w_context, context) {}

feeder_impl<Body, Item>* feeder_ptr() { return &my_feeder; }
Expand All @@ -475,7 +477,7 @@ class feeder_holder<Iterator, Body, Item, feeder_is_required<Body, Iterator, Ite
template <typename Iterator, typename Body, typename Item>
class for_each_root_task_base : public task {
public:
for_each_root_task_base(Iterator first, Iterator last, const Body& body, wait_context& w_context, task_group_context& e_context)
for_each_root_task_base(Iterator first, Iterator last, const Body& body, wait_context_node& w_context, task_group_context& e_context)
: my_first(first), my_last(last), my_wait_context(w_context), my_execution_context(e_context),
my_body(body), my_feeder_holder(my_wait_context, my_execution_context, my_body)
{
Expand All @@ -489,7 +491,7 @@ class for_each_root_task_base : public task {
protected:
Iterator my_first;
Iterator my_last;
wait_context& my_wait_context;
wait_context_node& my_wait_context;
task_group_context& my_execution_context;
const Body& my_body;
feeder_holder<Iterator, Body, Item> my_feeder_holder;
Expand Down Expand Up @@ -624,11 +626,11 @@ void run_parallel_for_each( Iterator first, Iterator last, const Body& body, tas
{
if (!(first == last)) {
using ItemType = get_item_type<Body, typename std::iterator_traits<Iterator>::value_type>;
wait_context w_context(0);
wait_context_node w_context(0);

for_each_root_task<Iterator, Body, ItemType> root_task(first, last, body, w_context, context);

execute_and_wait(root_task, context, w_context, context);
execute_and_wait(root_task, context, w_context.get_context(), context);
}
}

Expand Down

0 comments on commit 6fc04d1

Please sign in to comment.