目录
1. 引言
2. promise/future的含义
std::future
std::promise
std::packaged_task
std::async
处理异常
std::shared_future
实战:多线程实现快速排序
时钟与限定等待时间
参考:
1. 引言
在并发编程中,我们通常会用到一组非阻塞的模型:promisefuture。在python、js、java中都提供futurepromise,是现代语言常用的非阻塞编程模型。
先来看一个经典的烧茶水问题。烧茶水有三步:烧开水、买茶叶、泡茶叶。为节约时间,烧开水可以和买茶叶同时进行,泡茶叶必须在前两者完成后才能进行。
许多场景都可以抽象成烧茶水这样的异步模型,比如:
IO请求。cpu拉起io任务后,继续做手上的工作,io任务完成后,修改某个信号,cpu每执行一条指令都查看该信号是否改变,若检测到改变,再做响应后处理。
2. promise/future的含义
感性理解:
- promise: 承诺, 我"承诺"(如利用子线程)去执行一个子任务了, 什么时候执行我来决定, 你等我的结果就好.
- future: 期望, 作为执行子任务的结果存储容器, 从我这里可以拿到子线程的期望结果就好.
语法上的理解:
std::promise 对象可以保存某一类型 T 的值,该值可被 future 对象读取(可能在另外一个线程中),因此 std::promise 提供了一种线程同步的手段。在 std::promise 对象构造时可以和一个共享状态(通常是std::future)相关联,并可以在相关联的共享状态(std::future)上保存一个类型为 T 的值。
可以通过 get_future 来获取与该 std::promise 对象相关联的 std::future 对象,调用该函数之后,两个对象共享相同的共享状态。
- std::promise对象是异步提供者,它可以在某一时刻设置共享状态的值。
- std::future对象是异步返回者, 获取共享状态的值,或者在必要的情况下阻塞调用者并等待共享状态标志变为 ready,然后才能获取共享状态的值。
下面我们正式介绍futurepromise的含义:
- future表示一个可能还没有实际完成的异步任务的结果,针对这个结果, 可以添加回调函数以便在任务执行成功或失败后做出对应的操作;
- promise交由任务执行者使用,任务执行者通过promise可以标记任务完成或者失败;
- 交互流程:
- 在父进程中: 定义promise, 通过promise.get_future() 获取对应的future,
- 在父进程中将promise传递给子进程;然后进行非阻塞地干自己(父进程)的事情,
- 在子进程中通过 promise->set_value,为共享状态设置处理后的值;
- 然后在父进程中通过future.get()获取共享状态设置处理后的值
所以说,futurepromise编程模型本质上还是message pass(任务线程与主线程消息传递)。在future模型中阻塞和非阻塞都有:拉起一个新线程(非阻塞),在主线程
message pass的编程范式,我们可见多了,先来思考一下有哪几种编写方法:
- 利用条件变量。在任务线程完成时调用
notify_one() ,在主函数中调用wait() ; - 利用flag(原子类型)。在任务完成时修改flag,在主线程中阻塞,不断轮询flag直到成功;
上面第一种上锁会带来一定开销,好处是适合长时间阻塞,第二种适合短时间阻塞。
那么c++11 future采用哪一种呢?答案是第二种,future内定义了一个原子对象,主线程通过自旋锁不断轮询,此外会进行sys_futex系统调用。futex是linux非常经典的同步机制,锁冲突时在用户态利用自旋锁,而需要挂起等待时到内核态进行睡眠与唤醒。
其实future/promise最强大的功能是能够:
- 获得结果返回值;
- 处理异常(如果任务线程发生异常);
- 链式回调(目前c++标准库不支持链式回调,不过folly支持);
获得结果返回值
获得结果返回值的方法有很多,但都不如future/promise优雅。
我们也可以提供拉起一个新的
#include <chrono> #include <thread> #include <iostream> ? void threadCompute(int* res) { std::this_thread::sleep_for(std::chrono::seconds(1)); *res = 100; } ? int main() { int res; std::thread th1(threadCompute, 2, &res); th1.join(); std::cout << res << std::endl; return 0; }
用std::thread的缺点是:
- 通过
.join 来阻塞,本文例子比较简单,但代码一长,线程一多,忘记调用th1.join() ,就会捉襟见肘; - 使用指针传递数据非常危险,因为互斥量不能阻止指针的访问,而且指针的方式要更改接口,比较麻烦 ;
那么future/promise又是如何获得返回值的呢?通过future,future可以看成存储器,存储一个未来返回值。
先在主线程内创建一个promise对象,从promise对象中获得future对象;
再将promise引用传递给任务线程,在任务线程中对promise进行
std::future
此外,
可以通过下面三个方式来获得
std::promise 的get_future函数std::packaged_task 的get_future函数std::async 函数
std::promise
来看一个例子:
#include <iostream> #include <functional> #include <future> #include <thread> #include <chrono> #include <cstdlib> ? void sub_thread_task(std::promise<int>& promiseObj) { std::this_thread::sleep_for(std::chrono::seconds(1)); promiseObj.set_value(100); // set_value后,future变为就绪。 } ? int main() { std::promise<int> promiseObj; std::future<int> futureObj = promiseObj.get_future(); // 采用std::ref引用传值 std::thread t(&sub_thread_task, std::ref(promiseObj)); // 会阻塞 ,获取set_value的 int 类型值 100 std::cout << futureObj.get() << std::endl; t.join(); return 0; }
std::packaged_task
因为
- 封装在 std::function 对象中;
- 作为线程函数传递到 std::thread 对象中;
- 作为可调用对象传递另一个函数中;
- 可以直接进行调用 ;
我们经常用 std::packaged_task 打包任务, 并在它被传到别处之前的适当时机取回期望值。
下面我们来编写一个GUI界面线程。GUI往往需要一个线程去轮询任务队列,看是否需要处理任务;还有一个线程处理鼠标等IO响应,把
1. #include <deque> 2. #include <mutex> 3. #include <future> 4. #include <thread> 5. #include <utility> 6. 7. std::mutex m; 8. std::deque<std::packaged_task<void()> > tasks; 9. 10. bool gui_shutdown_message_received(); 11. void get_and_process_gui_message(); 12. 13. void gui_thread() // 1 14. { 15. while(!gui_shutdown_message_received()) // 如果用户关闭界面,就退出 16. { 17. get_and_process_gui_message(); // get用户操作 18. std::packaged_task<void()> task; 19. { 20. std::lock_guard<std::mutex> lk(m); // 上局部锁 21. if(tasks.empty()) // 轮询直到不为空 22. continue; 23. task=std::move(tasks.front()); // 取FIFO任务队列第一个 24. tasks.pop_front(); 25. } 26. task(); // task是packaged_task,执行该任务,并把返回值给future对象 27. } 28. } 29. 30. std::thread gui_bg_thread(gui_thread); // 启动后台线程 31. 32. template<typename Func> 33. std::future<void> post_task_for_gui_thread(Func f) 34. { 35. std::packaged_task<void()> task(f); // 作为回调函数 36. std::future<void> res=task.get_future(); // 获得future对象 37. std::lock_guard<std::mutex> lk(m); 38. tasks.push_back(std::move(task)); // 放入任务对列 39. return res; // future对象后续将得到task的返回值 40. }
std::async
我们可以用
int Sum (int a, int b){ return a + b; } int Sum_with_MultiThread(int from, int to, size_t thread_num) { int ret = 0; int n = thread_num ? (to - from) / thread_num : (to - from); std::vector<std::future<int64_t>> v; for (; from <= to; ++from) { v.push_back(std::async(Sum, from, from + n > to ? to : from + n)); from += n; } for (auto &f : v) { ret += f.get(); } return ret; } ?
此外,
而
std::launch::deferred :
表示线程入口函数调用被延迟到std::future 对象调用wait() 或者get() 函数 调用才执行。
如果wait() 和get() 没有调用,则不会创建新线程,也不执行函数;
如果调用wait() 和get() ,实际上也不会创建新线程,而是在主线程上继续执行;std::launch::async :
表示强制这个异步任务在 新线程上执行,在调用std::async() 函数的时候就开始创建线程。std::launch::deferred|std::launch::async :
这里的“|”表示或者。如果没有给出launch参数,默认采用该种方式。
操作系统会自行评估选择async or defer,如果系统资源紧张,则采用defer,就不会创建新线程。避免创建线程过长,导致崩溃。
嘶,async默认的launch方式将由操作系统决定,这样好处是不会因为开辟线程太多而崩溃,但坏处是这种不确定性会带来问题,参考《effective modern c++》:
note:所以如果我们确定是异步执行的话,最好显示给出launch方式!
std::async在使用时不仅要注意launch的不确定性,还有一个坑:async返回的future对象的析构是异步的。
见下面代码,当async返回的future对象是右值时,要进行析构,此时阻塞了。至于为什么要阻塞析构,感兴趣的可以google
#include <iostream> #include <future> #include <thread> #include <chrono> ? int main() { std::cout << "Test 1 start" << std::endl; auto fut1 = std::async( std::launch::async, [] { std::this_thread::sleep_for( std::chrono::milliseconds(5000)); std::cout << "work done 1! "; return 1; } ); // 这一步没有阻塞,因为async的返回的future对象用于move构造了fut1,没有析构 std::cout << "Work done - implicit join on fut1 associated thread just ended "; std::cout << "Test 2 start" << std::endl; std::async(std::launch::async, [] { std::this_thread::sleep_for( std::chrono::milliseconds(5000)); std::cout << "work done 2!" << std::endl; } );// 这一步竟然阻塞了!因为async返回future对象是右值,将要析构,而析构会阻塞 std::cout << "This shold show before work done 2!?" << std::endl; return 0; }
处理异常
期望编程范式的一大好处是能够接住异常,这是
std::async 处理异常
future.get()可以获得async中的异常,外部套一个try/catch。至于是原始的异常对象, 还是一个拷贝,不同的编译器和库将会在这方面做出不同的选择 。
void foo() { std::cout << "foo()" << std::endl; throw std::runtime_error("Error"); } ? int main() { try { std::cout << "1" << std::endl; auto f = std::async(std::launch::async, foo); f.get(); std::cout << "2" << std::endl; } catch (const std::exception& ex) { std::cerr << ex.what() << std::endl; } }
std::packaged_task 处理异常
std::promise 处理异常
try{ some_promise.set_value(calculate_value()); } catch(...){ some_promise.set_exception(std::current_exception()); }
此外,任何情况下, 当期望值的状态还不是“就绪”时, 调用
// std::packaged_task<> std::future<void> future; try { // 提前销毁task { std::packaged_task<void()> task([] { std::cout << "do packaged task." << std::endl; }); future = task.get_future(); } future.get(); } catch (const std::future_error &e) { std::cout << "Packaged task exception: " << e.what() << std::endl; } ? // std::promise<> try { // 提前销毁promise { std::promise<void> promise; future = promise.get_future(); } future.get(); } catch (const std::future_error &e) { std::cout << "Promise exception: " << e.what() << std::endl; }
std::shared_future
注意,每个线程都拥有自己对应的拷贝对象,这样就不会有data race的问题,多个线程访问共享同步结果是安全的。
那么如何创建一个
如下:
std::promise<int> p; std::future<int> f(p.get_future()); assert(f.valid()); // 1 期望值 f 是合法的,说明f此时有所有权 std::shared_future<int> sf(std::move(f)); assert(!f.valid()); // 2 期望值 f 现在是不合法的,说明已经转移所有权 assert(sf.valid()); // 3 sf 现在是合法的,说明获得了所有权
当然,我们也可以用std::promise.get_future来构建shared_future
std::promise<std::string> p; std::shared_future<std::string> sf(p.get_future()); // 右值构造函数,转移所有权 // 等价于 auto sf=p.get_future().share();
实战:多线程实现快速排序
快排相比都很熟悉,下面先看一下普通的顺序执行版本:
template<typename T> std::list<T> sequential_quick_sort(std::list<T> input) { if(input.empty()) { return input; } std::list<T> result; result.splice(result.begin(),input,input.begin()); // splice是剪切操作,把input.begin()元素剪切到result.begin()位置 T const& pivot=*result.begin(); // 选定基准 ? auto divide_point=std::partition(input.begin(),input.end(), [&](T const& t){return t<pivot;}); // 找到比基准小的点 ? std::list<T> lower_part; lower_part.splice(lower_part.end(),input,input.begin(),divide_point); //把input.begin()到divide_point,剪切到lower_part.end()处 auto new_lower(sequential_quick_sort(std::move(lower_part))); // 递归 auto new_higher(sequential_quick_sort(std::move(input))); // 递归 result.splice(result.end(),new_higher); // 合并higher result.splice(result.begin(),new_lower); // 合并lower return result; }
那么上面版本哪里可以并行呢?new_lower和new_higher可以并行
我们只需更改部分代码为:
std::future<std::list<T> > new_lower( std::async(¶llel_quick_sort<T>,std::move(lower_part))); // 异步 auto new_higher(parallel_quick_sort(std::move(input))); // 同步 ? result.splice(result.end(),new_higher); // result.splice(result.begin(),new_lower.get()); //
采用async我们可以充分利用多核,而且async还有一个好处是,系统会帮你决定是否创建新线程,这可以避免因线程数太多而导致的崩溃。
比如,十次递归调用,将会创建1024个执行线程。当运行库认为任务过多时(已影响性能),这些任务应该在使用get()函数获取的线程上运行,而不是在新线程上运行,这样就能避免任务向线程传递的开销。
当然上面的代码也可以用线程池来实现,此外,对于快排有更高级的多线程模型,这里的例子不是最理想的。
时钟与限定等待时间
C++11标准中的
主要定义了三种类型:
durations 时间间隔;clocks 时钟。包括system_clock 、steady_clock 、high_resolution_clock ;time points 时间点;
之前介绍的很多同步模型都支持wait_for、wait_utill两个方法,当发生超时,不再等待。如:
std::future<int> fut = std::async(do_some_thing); std::chrono::milliseconds span(100); // durations fut.wait_for(span); // 参数是时间间隔
发布于 2022-08-13 15:10
参考:
c++ 并发std::future与std::promise - 知乎
C++多线程编程:期望future - 知乎