C++ 并发编程实战(第一版)¶
译文地址:https://github.com/xiaoweiChen/Cpp_Concurrency_In_Action
阅读地址:https://www.bookstack.cn/books/Cpp_Concurrency_In_Action
第二版译文地址:https://github.com/xiaoweiChen/CPP-Concurrency-In-Action-2ed-2019
第三方笔记(第二版):https://github.com/downdemo/Cpp-Concurrency-in-Action-2ed
学习C++11/14: http://www.bogotobogo.com/cplusplus/C11
chap1: 你好,C++的并发世界¶
何谓并发¶
计算机领域的并发指的是在单个系统里同时执行多个独立的任务,而非顺序的进行一些活动。
- 单核机器的任务切换实现并发:通过“这个任务做一会,再切换到别的任务,再做一会儿”的方式,让任务看起来是并行执行的。这种方式称为任务切换。
- 多核处理器的并发是硬件并发(hardware concurrency):真正的并发(这里面准确说是并行,并行是并发的一种情况)
并发的途径¶
多进程并发¶
多进程并发是将应用程序分为多个独立的进程,它们在同一时刻运行。独立的进程间可以通过进程间常规的通信渠道传递讯息(信号、套接字、文件、管道等等)
多进程并发缺点有: - 进程间通信慢:进程之间的通信通常不是设置复杂,就是速度慢,这是因为操作系统会在进程间提供了一定的保护措施,以避免一个进程去修改另一个进程的数据 - 运行进程的开销高:运行多个进程所需的固定开销比较大,需要时间启动进程,操作系统需要内部资源来管理进程。
多线程并发优点:
- 可以更容易编写安全的并发代码:一个进程crash 掉了不影响其他进程。
- 可以通过 socket 等方式进行远程连接,在不同的机器上运行独立的进程。
多线程并发¶
多线程并发,指的是在单个进程中运行多个线程。线程很像轻量级的进程:每个线程相互独立运行,且线程可以在不同的指令序列中运行。但是,进程中的所有线程都共享地址空间,并且所有线程访问到大部分数据———全局变量仍然是全局的,指针、对象的引用或数据可以在线程之间传递。
多线程并发优点:
- 多线程相关开销远小于多个进程:因为地址空间共享的,线程进行切换时候,不涉及 TLB 方面内容。
- 共享内存具有灵活性,缺点需要保证每个线程所访问到的数据的一致性。
为什么使用并发?¶
使用并发的目的主要有两个:
- 关注点分离(SOC)
- 性能
为了分离关注点¶
编写软件时,分离关注点是个好主意;通过将相关的代码与无关的代码分离,可以使程序更容易理解和测试,从而减少出错的可能性。即使一些功能区域中的操作需要在同一时刻发生的情况下,依旧可以使用并发分离不同的功能区域;若不显式地使用并发,就得编写一个任务切换框架,或者在操作中主动地调用一段不相关的代码。
为了性能¶
另一个使用并发的原因是为了性能。有两种并发手段可以提高性能:
- 任务并行(task parallelism):将一个单个任务分成几部分,且各自并行运行,从而降低总运行时间。
- 数据并行(data parallelism):将一个数据集分成几份,且各自并行处理,从而降低总运行时间,带来吞吐量的提升。
开始入门¶
你好,并发世界¶
#include <iostream>
#include <thread>
using namespace std;
void hello() { cout << "Hello Concurrent World!" << endl; }
int main() {
thread t(hello);
t.join();
return 0;
}
chap2: 线程管理¶
基础¶
启动线程¶
线程创建时候,需要为线程设置启动函数。
示例 1:使用普通函数
示例 2: 使用函子(也称函数对象)
#include <iostream>
#include <thread>
using namespace std;
class backgroud_task {
public:
void operator()() const { std::cout << "background task\n"; };
};
int main() {
backgroud_task f;
thread my_thread(f);
my_thread.join();
// thread my_thread2(backgroud_task()); //
// 注意此时传入的是函数声明,而不是函数对象
// my_thread2.join();
thread my_thread2((backgroud_task()));
my_thread2.join();
thread my_thread3{backgroud_task()};
my_thread3.join();
class backgroud_task fn = backgroud_task();
thread my_thread4(fn);
my_thread4.join();
// 使用闭包
thread my_thread5([] { std::cout << "background task\n"; });
my_thread5.join();
}
使用函数对象时候,需要注意不用错误地使用了函数声明,而应该使用函数对象:
创建线程之后,如果不等待线程完成,可能会发生线程访问旧的局部变量:
#include <iostream>
#include <thread>
using namespace std;
void do_something(int i) {
++i;
cout << i << endl;
};
struct func {
int &i;
func(int &i_) : i(i_) {}
void operator()() {
for (unsigned j = 0; j < 100; ++j) {
do_something(i); // 潜在访问隐患:悬空引用
}
}
};
void oops() {
int some_local_state = 0;
func my_func(some_local_state);
thread t(my_func);
t.detach(); // 不等待新线程运行结束,当 oops
// 结束后,新线程还会访问到局部变量some_local_state
}
int main() {
oops();
this_thread::sleep_for(std::chrono::milliseconds(500)); // 休眠 500 毫秒
}
等待线程完成¶
如果需要等待线程结束,可以使用join()函数。join()函数会阻塞当前线程,直到线程完成。对于一个线程,join()只能使用一次,再次使用会发生异常。我们可以使用 joinable()函数来检查线程是否可以join。
当主线程没有等待到线程完成,程序就会退出,此时线程会运行 teriminate() 来结束掉线程。
特殊情况下的等待¶
当在 join 调用之前,程序抛出异常时,会导致无法等待线程完成。
解决办法 1 是在 catch 块中调用 join。
#incldue "func_header.hpp"
void f()
{
int some_local_state=0;
func my_func(some_local_state);
std::thread t(my_func);
try
{
do_something_in_current_thread();
}
catch(...)
{
t.join(); // 1
throw;
}
t.join(); // 2
}
另外一种方式是使用 RAII(Resource Acquisition Is Initialization,资源获取即初始化方式),提供一个类,在析构函数中使用 join:
#include "func_header.hpp"
class thread_guard
{
std::thread& t;
public:
explicit thread_guard(std::thread& t_):// 1
t(t_)
{}
~thread_guard()
{
if(t.joinable()) // 2
{
t.join(); // 3
}
}
thread_guard(thread_guard const&)=delete; // 4
thread_guard& operator=(thread_guard const& =delete; // 5
};
void f()
{
int some_local_state=0;
func my_func(some_local_state);
std::thread t(my_func);
thread_guard g(t);
do_something_in_current_thread();
}
上面代码解释说明:
-
1处的 explicit 关键字,防止隐式转换,当类的构造函数中只有一个参数时候,默认支持隐式转换的:
-
2,3处,如果线程没有 join,那么在析构函数中会调用 join,否则不会。
- 4,5处,防止拷贝构造函数和赋值操作符被调用。目的是为了不让编译器自动生成它们。直接对一个对象进行拷贝或赋值是危险的,因为这可能会弄丢已经加入的线程。通过删除声明,任何尝试给thread_guard对象赋值的操作都会引发一个编译错误。
后台运行线程¶
可以通过在线程对象中调用detach()函数,将线程设置为后台线程,此后这个对象也是无法加入的了。
std::thread t(do_background_work);
t.detach(); // 将线程设置为后台运行
assert(!t.joinable()); // 该线程无法join 了,此处一定断言成功
向线程函数传递参数¶
std::thread 线程函数可以接受任意个参数,这些参数会传递给线程函数。std::thread 中第一个参数是线程函数,其余参数是该线程函数的参数。
线程传参的两类问题¶
传递了局部变量到线程中,造成悬垂引用¶
C++字符串类型 std::string
(因为std::string
是整个生命周期内都有效的对象) 可以安全的传递给线程独立内存中:
hello
类型是 char const *
类型,当会在线程的上下文中完成字面值向 std::string
对象的转换。
局部变量造成的问题的示例:
void f(int i,std::string const& s);
void oops(int some_param)
{
char buffer[1024]; // 局部变量
sprintf(buffer, "%i",some_param);
std::thread t(f,3,buffer); // 当 oops 执行完成之后,buffer变成了悬垂指针
t.detach();
}
当 oops 执行完成之后,局部变量 buffer 会释放,线程 t 再次访问时候会导致悬垂引用。解决办法可以将 buffer 转换成 std::string 类型:
void f(int i,std::string const& s);
void not_oops(int some_param)
{
char buffer[1024];
sprintf(buffer,"%i",some_param);
std::thread t(f,3,std::string(buffer)); // 使用std::string,避免悬垂指针
t.detach();
}
期待传递一个引用,却传递了一个值对象¶
void update_data_for_widget(widget_id w,widget_data& data); // 1
void oops_again(widget_id w)
{
widget_data data;
std::thread t(update_data_for_widget,w,data); // 2
display_status();
t.join();
process_widget_data(data); // 3
}
上面代码中尽管 1 处期待第二个参数是传入一个引用,但 2 处的 std::thread 的构造函数并不知晓,它将无视函数 prcess_widget_data 期待的参数类型,而是将 data 的副本作为第二个参数传递给 update_data_for_widget。这就导致在线程函数中修改了 data,而 oops_again 外面访问到的data 数据依旧没有变。
解决办法是使用 std::ref 将参数转换成引用类型:
类成员函数作为线程函数¶
我们可以传递一个类成员函数指针作为线程函数,第一个参数是对象指针,第二个参数是函数参数,以此类推。
#include <iostream>
#include <thread>
class X {
public:
void do_lengthy_work(int);
};
void X::do_lengthy_work(int num) {
std::cout << "do_lengthy_work: " << num << std::endl;
}
int main() {
X my_x;
int num(0);
std::thread t(&X::do_lengthy_work, &my_x, num);
my_x.do_lengthy_work(10);
t.join();
}
移动对象到线程中¶
线程函数中参数默认是拷贝过去的,但我们可以借助 std::unique_ptr 来实现移动,将参数移动到线程函数中:
void process_big_object(std::unique_ptr<big_object>);
std::unique_ptr<big_object> p(new big_object);
p->prepare_data(42);
std::thread t(process_big_object,std::move(p));
对于函数参数或函数返回的类型声明了右值引用类型时候,若原对象是一个临时变量时,自动进行移动操作,若原对象是一个命名变量,那么转移的时候就需要使用std::move()进行显示移动。
转移线程所有权¶
C++标准库中有很多资源占有(resource-owning)类型,比如std::ifstream
,std::unique_ptr
,std::thread
等,它们都是 可移动,但不可拷贝的。
“可移动,但不可拷贝”如何理解?
在 C++ 中,"可移动,但不可拷贝" 是指一个对象可以被移动,但不能被拷贝(复制)。这是 C++11 引入移动语义后的一个常见设计模式,特别适用于管理资源所有权(如动态内存、文件句柄、网络连接等)的类。这种设计模式的核心思想是:
-
不可拷贝(复制):类的拷贝构造函数和拷贝赋值操作符被删除或私有化,使得对象不能被拷贝。这是通过显式地删除这些函数来实现的,如下所示:
class NonCopyable { public: NonCopyable() = default; ~NonCopyable() = default; // 删除拷贝构造函数 NonCopyable(const NonCopyable&) = delete; // 删除拷贝赋值操作符 NonCopyable& operator=(const NonCopyable&) = delete; };
在这个例子中,
NonCopyable
类的拷贝构造函数和拷贝赋值操作符都被删除了,这意味着你不能通过拷贝来创建这个类的额外实例。 -
可移动:类提供移动构造函数和移动赋值操作符,允许对象在资源转移时被“移动”。移动构造函数和移动赋值操作符接受一个右值引用参数,如下所示:
class Movable { public: Movable() = default; ~Movable() = default; // 移动构造函数 Movable(Movable&& other) : data_(std::move(other.data_)) { other.data_ = nullptr; // 将资源所有权转移后,将原对象的指针置空 } // 移动赋值操作符 Movable& operator=(Movable&& other) { if (this != &other) { data_ = std::move(other.data_); other.data_ = nullptr; } return *this; } private: int* data_; };
在这个例子中,
Movable
类的移动构造函数和移动赋值操作符都被定义了,它们使用std::move
来转移资源的所有权。移动操作通常涉及将资源从一个对象转移到另一个对象,而不是复制资源。
为什么采用这种设计?¶
这种设计模式的优势在于:
- 资源管理:它有助于确保资源不会被不必要地复制,从而避免资源泄漏或不必要的性能开销。
- 异常安全:通过避免复制,可以减少在资源管理过程中发生异常的风险。
- 所有权明确:它明确了资源的所有权,使得资源的管理和释放更加直观和安全。
典型应用¶
这种模式通常用于智能指针(如 std::unique_ptr
)、文件流(如 std::ofstream
)、网络连接等资源管理类。例如,std::unique_ptr
就是一个典型的 "可移动,但不可拷贝" 的类,它通过移动语义来安全地转移资源的所有权。
总结来说,"可移动,但不可拷贝" 是一种资源管理策略,它通过禁止拷贝并支持移动来优化资源的使用和提高程序的安全性。
示例:创建两个执行线程,并在实例之间转移所有权:
#include <iostream>
#include <thread>
void some_function() { std::cout << "some function" << std::endl; }
void some_other_function() { std::cout << "some other function" << std::endl; }
int main() {
std::thread t1(some_function);
std::thread t2 = std::move(t1); // t1所有权转移给了 t2
t1 = std::thread(
some_other_function); // 上面的 t1
// 已经与线程没有关联了,所以可以重新与一个新线程进行关联
std::thread t3; // 默认构造方式创建了一个 thread,但未与任何执行线程进行关联
t3 = std::move(t2); // 将t2关联的线程所有权转移给 t3,因为 t2
// 是一个命名对象,这里面需要显示调用 std::move 进行移动
t1 = std::move(t3); // 由于 t1 已经关联了一个执行线程,此时再将 t3 转移到
// t1,此时 t1 会先调用std::teriminate()终止程序继续执行
return 0;
}
作为函数返回值转移所有权¶
std::thread f()
{
void some_function();
return std::thread(some_function);
}
std::thread g()
{
void some_other_function(int);
std::thread t(some_other_function,42);
return t;
}
作为函数参数转移所有权¶
void f(std::thread t);
void g()
{
void some_function();
f(std::thread(some_function));
std::thread t(some_function);
f(std::move(t));
}
上面代码中 f 函数即使没有显示声明接受一个右值引用,但是 std::thread 的函数构造函数和赋值操作符都被删除了,使得 std::thread 对象不能被拷贝或赋值,但可以被移动。因此,std::thread 对象在被创建时(如 std::thread(some_function))是一个右值,可以被移动构造到 f 函数的参数 t 中。
当你使用 std::move(t) 时,你实际上是在显式地将 t 转换为一个右值引用,这会触发 f 函数的移动构造版本(如果存在的话)。但是,即使 f 的参数 t 被声明为一个左值引用(std::thread&),它仍然可以接收一个右值,因为左值引用可以绑定到右值上,这是 C++ 的一个特性。
然而,最佳实践是将这类函数的参数声明为右值引用,以明确表示函数的意图是移动而不是拷贝传入的对象。因此,更明确的声明应该是:
这样的声明清楚地表明了 f 函数期望接收一个右值引用,并且会移动传入的对象。这不仅使得代码的意图更加清晰,也避免了潜在的混淆。f(std::thread(some_function)); 创建了一个临时的 std::thread 对象,它是一个右值,可以被移动到 f 函数的参数中。而 f(std::move(t)); 显式地将 t 转换为右值引用,这也能够工作,因为 std::move 只是将对象转换为右值引用,而 f 函数的参数可以接受任何 std::thread 类型的对象,无论是左值还是右值。
基于移动语义实现等待线程完成¶
将线程移动到 scoped_thread 类中,并在析构函数中调用 join()来实现等待线程完成。
class scoped_thread
{
std::thread t;
public:
explicit scoped_thread(std::thread t_):t(std::move(t_)) {
if(!t.joinable())
throw std::logic_error("No thread");
}
~scoped_thread() {
t.join();
}
scoped_thread(scoped_thread const&)=delete;
scoped_thread& operator=(scoped_thread const&)=delete;
};
// 使用示例
#include "func_header.hpp"
void f()
{
int some_local_state;
scoped_thread t(std::thread(func(some_local_state)));
do_something_in_current_thread();
}
多个工作线程并等待结束¶
void do_work(unsigned id);
void f()
{
std::vector<std::thread> threads;
for(unsigned i=0; i < 20; ++i)
{
threads.push_back(std::thread(do_work,i)); // 产生线程
}
std::for_each(threads.begin(),threads.end(),
std::mem_fn(&std::thread::join)); // 对每个线程调用join()
}
运行时决定线程数量¶
std::thread::hardware_concurrency()
返回当前系统的硬件并发能力,返回值是一个无符号整数,表示可以同时运行的线程数。当系统信息无法获取时,函数返回0。
原生并行版的 std::accumulate¶
template<typename Iterator,typename T>
struct accumulate_block
{
void operator()(Iterator first,Iterator last,T& result)
{
result=std::accumulate(first,last,result);
}
};
template<typename Iterator,typename T>
T parallel_accumulate(Iterator first,Iterator last,T init)
{
unsigned long const length=std::distance(first,last);
if(!length) { // 如果输入范围为空,返回 init值
return init;
}
unsigned long const min_per_thread=25; // 每个线程工作的任务数默认是 25 个
unsigned long const max_threads=
(length+min_per_thread-1)/min_per_thread; // 完成所有任务需要的最大线程数
unsigned long const hardware_threads=
std::thread::hardware_concurrency();
unsigned long const num_threads=
std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads); // 实际核数与所需核数取最小值
unsigned long const block_size=length/num_threads; // 每个线程的任务数
std::vector<T> results(num_threads); // 存储结果
std::vector<std::thread> threads(num_threads-1); // 创建线程数组
Iterator block_start=first;
for(unsigned long i=0; i < (num_threads-1); ++i)
{
Iterator block_end=block_start;
std::advance(block_end,block_size);
threads[i]=std::thread(
accumulate_block<Iterator,T>(),
block_start,block_end,std::ref(results[i]));
block_start=block_end;
}
accumulate_block<Iterator,T>()( // 最后一个线程
block_start,last,results[num_threads-1]);
// 等待所有线程结束
std::for_each(threads.begin(),threads.end(),
std::mem_fn(&std::thread::join));
// 汇总结果
return std::accumulate(results.begin(),results.end(),init);
}
识别线程¶
我们可以通过 std::this_thread::get_id()
来识别当前线程。get_id()
返回 std::thread::id
类型的对象,它表示当前线程的 ID。
std::thread::id master_thread;
void some_core_part_of_algorithm()
{
if(std::this_thread::get_id()==master_thread)
{
do_master_thread_work(); // 如果是主线程
} else {
do_common_work();
}
}