Skip to content

Commit

Permalink
1. 修改第二章第二节示例,改为使用 iter_value_t 再增加补充描述和兼容 C++11 的写法
Browse files Browse the repository at this point in the history
2. 修改完成第四章“`future` 与 `std::packaged_task`” 的内容
  • Loading branch information
Mq-b committed Apr 29, 2024
1 parent 7288e4f commit 72c6b94
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 12 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,6 @@
node_modules/
.vuepress/.cache/
.vuepress/.temp/
.vuepress/.dist/
.vuepress/.dist/

.vscode/
15 changes: 12 additions & 3 deletions md/02使用线程.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ int main(){
```cpp
template<typename InputIt>
auto sum(InputIt first, InputIt last){
using value_type = std::remove_cvref_t<decltype(*first)>;
using value_type = std::iter_value_t<InputIt>;
std::size_t num_threads = std::thread::hardware_concurrency();
std::ptrdiff_t distance = std::distance(first, last);

Expand Down Expand Up @@ -118,11 +118,20 @@ auto sum(InputIt first, InputIt last){
}
```
> [运行](https://godbolt.org/z/3jePcdGvh)测试。
> [运行](https://godbolt.org/z/7d9Yex8vn)测试。
我们写了这样一个求和函数 `sum`,接受两个迭代器计算它们范围中对象的和。
我们先获取了迭代器所指向的值的类型,定义了一个别名 `value_type`,记得删除 CV 与引用,我们这里使用到的 [`std::remove_cvref`](https://zh.cppreference.com/w/cpp/types/remove_cvref) 是 C++20 引入的。如果希望代码可以在 C++11 的环境运行也可以自行修改。`num_threads` 是当前硬件支持的并发线程的值。[`std::distance`](https://zh.cppreference.com/w/cpp/iterator/distance) 用来计算 first 到 last 的距离,也就是我们要进行求和的元素个数了。
我们先获取了迭代器所指向的值的类型,定义了一个别名 `value_type`,我们这里使用到的 [`std::iter_value_t`](https://zh.cppreference.com/w/cpp/iterator/iter_t) 是 C++20 引入的,[返回类型推导](https://zh.cppreference.com/w/cpp/language/function#.E8.BF.94.E5.9B.9E.E7.B1.BB.E5.9E.8B.E6.8E.A8.E5.AF.BC)是 C++14 引入。如果希望代码可以在 C++11 的环境运行也可以自行修改为:
```cpp
template<typename InputIt,typename value_type = typename std::remove_cv<typename std::remove_reference<decltype(*std::declval<InputIt>())>::type>::type>
value_type sum(InputIt first, InputIt last);
```

> [运行](https://godbolt.org/z/fTcYeKE67)测试。
`num_threads` 是当前硬件支持的并发线程的值。[`std::distance`](https://zh.cppreference.com/w/cpp/iterator/distance) 用来计算 first 到 last 的距离,也就是我们要进行求和的元素个数了。

我们这里的设计比较简单,毕竟是初学,所以只对元素个数大于 **`1024000`** 的进行多线程求和,而小于这个值的则直接使用标准库函数 [`std::accumulate`](https://zh.cppreference.com/w/cpp/algorithm/accumulate) 求和即可。

Expand Down
101 changes: 93 additions & 8 deletions md/04同步操作.md
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ int main(){
通常它会和 `std::future` 一起使用,不过也可以单独使用,我们一步一步来:

```cpp
std::packaged_task<int(int, int)> task([](int a, int b){
std::packaged_task<double(int, int)> task([](int a, int b){
return std::pow(a, b);
});
task(10, 2); // 执行传递的 lambda,但无法获取返回值
Expand All @@ -437,33 +437,118 @@ task(10, 2); // 执行传递的 lambda,但无法获取返回值
如果想要异步的获取返回值,我们需要在调用 `operator()` 之前,让它和 future 关联,然后使用 `future.get()`,也就是:
```cpp
std::packaged_task<int(int, int)> task([](int a, int b){
std::packaged_task<double(int, int)> task([](int a, int b){
return std::pow(a, b);
});
std::future<int>future = task.get_future();
std::future<double>future = task.get_future();
task(10, 2); // 此处执行任务
std::cout << future.get() << '\n'; // 不堵塞,此处获取返回值
```

> [运行](https://godbolt.org/z/vsbe37Pz5)测试。
> [运行](https://godbolt.org/z/799Khvadc)测试。
先关联任务,再执行任务,当我们想要获取任务的返回值的时候,就 `future.get()` 即可。值得注意的是,任务并不会在线程中执行,只是异步罢了。想要在线程中执行异步任务,然后再获取返回值,我们可以这么做:
先关联任务,再执行任务,当我们想要获取任务的返回值的时候,就 `future.get()` 即可。值得注意的是,任务并不会在线程中执行,想要在线程中执行异步任务,然后再获取返回值,我们可以这么做:

```cpp
std::packaged_task<int(int, int)> task([](int a, int b){
std::packaged_task<double(int, int)> task([](int a, int b){
return std::pow(a, b);
});
std::future<int>future = task.get_future();
std::future<double>future = task.get_future();
std::thread t{ std::move(task),10,2 }; // 任务在线程中执行
t.join();

std::cout << future.get() << '\n'; // 并不堵塞,获取任务返回值罢了
```
> [运行](https://godbolt.org/z/xhMob4Gq1)测试。
> [运行](https://godbolt.org/z/85r9db49z)测试。
因为 `task` 本身是重载了 `operator()` 的,是可调用对象,自然可以传递给 `std::thread` 执行,以及传递调用参数。唯一需要注意的是我们使用了 `std::move` ,这是因为 `std::packaged_task` 只能移动,不能复制。
---
简而言之,其实 `std::packaged_task` 也就是一个“包装”类而已,它本身并没什么特殊的,老老实实执行我们传递的任务,且方便我们获取返回值罢了,明确这一点,那么一切都不成问题。
`std::packaged_task` 也可以在线程中传递,在需要的时候获取返回值,而非像上面那样将它自己作为可调用对象:
```cpp
template<typename R, typename...Ts, typename...Args>
requires std::invocable<std::packaged_task<R(Ts...)>&, Args...>
void async_task(std::packaged_task<R(Ts...)>& task, Args&&...args) {
// todo..
task(std::forward<Args>(args)...);
}
int main() {
std::packaged_task<int(int,int)> task([](int a,int b){
return a + b;
});
int value = 50;
std::future<int> future = task.get_future();
// 创建一个线程来执行异步任务
std::thread t{ [&] {async_task(task, value, value); } };
std::cout << future.get() << '\n';
t.join();
}
```

> [运行](https://godbolt.org/z/qdK8GMaGE)测试。
我们套了一个 lambda,这是因为函数模板不是函数,它并非具体类型,没办法直接被那样传递使用,只能包一层了。这只是一个简单的示例,展示可以使用 `std::packaged_task` 作函数形参,然后我们来传递任务进行异步调用等操作。

我们再将第二章实现的并行 `sum` 改成 `std::package_task` + `std::future` 的形式:

```cpp
template<typename InputIt>
auto sum(InputIt first, InputIt last) {
using value_type = std::iter_value_t<InputIt>;
std::size_t num_threads = std::thread::hardware_concurrency();
std::ptrdiff_t distance = std::distance(first, last);

if (distance > 1024000) {
// 计算每个线程处理的元素数量
std::size_t chunk_size = distance / num_threads;
std::size_t remainder = distance % num_threads;

// 存储每个线程要执行的任务
std::vector<std::packaged_task<value_type()>>tasks;
// 和每一个任务进行关联的 future 用于获取返回值
std::vector<std::future<value_type>>futures(num_threads);

// 存储关联线程的线程对象
std::vector<std::thread> threads;

// 制作任务、与 future 关联、启动线程执行
InputIt start = first;
for (std::size_t i = 0; i < num_threads; ++i) {
InputIt end = std::next(start, chunk_size + (i < remainder ? 1 : 0));
tasks.emplace_back(std::packaged_task<value_type()>{[start, end, i] {
return std::accumulate(start, end, value_type{});
}});
start = end; // 开始迭代器不断向前
futures[i] = tasks[i].get_future(); // 任务与 std::future 关联
threads.emplace_back(std::move(tasks[i]));
}

// 等待所有线程执行完毕
for (auto& thread : threads)
thread.join();

// 汇总线程的计算结果
value_type total_sum {};
for (std::size_t i = 0; i < num_threads; ++i) {
total_sum += futures[i].get();
}
return total_sum;
}

value_type total_sum = std::accumulate(first, last, value_type{});
return total_sum;
}
```
> [运行](https://godbolt.org/z/z4PMYaznG)测试。
相比于之前,其实不同无非是定义了 `std::vector<std::packaged_task<value_type()>> tasks` 与 `std::vector<std::future<value_type>> futures` ,然后在循环中制造任务插入容器,关联 tuple,再放到线程中执行。最后汇总的时候写一个循环,`futures[i].get()` 获取任务的返回值加起来即可。
到此,也就可以了。

0 comments on commit 72c6b94

Please sign in to comment.