线程管理¶
线程管理基础¶
启动线程¶
每个程序都至少有一个线程,即 main 函数所在的线程,称之为主线程。
将函数添加为 std::thread 的参数即可启动线程
#include <thread>
#include <iostream>
void f()
{
std::cout << "hello world";
}
int main()
{
std::thread t(f);
t.join();
}
std::thread
的参数也可以是带 operator() 的对象实例或者 lambda:
struct A {
void operator()()const
{
std::cout << 1 <<"\n";
}
};
int main()
{
A a;
std::thread t1(a);
//@ std::thread t(A()); //@ most vexing parse,A()被视为函数声明
//@ 解决 most vexing parse的方法:
std::thread t2((A()));
std::thread t3{ A() };
//@ 使用 lambda
std::thread t4([] {std::cout << 1 << "\n"; });
t1.join();
t2.join();
t3.join();
t4.join();
return 0;
}
join 和 detach¶
启动线程后在线程销毁前要对其调用 join 或 detach,否则 std::thread 的析构函数会调用 std::terminate 终止程序。
detach¶
- detach 是让目标线程成为守护线程(daemon threads)。
- 一旦
detach
,目标线程将独立执行,即便其对应的
thread对象销毁也不影响线程的执行。 - 一旦 detach ,主调线程无法再取得该子线程的控制权。这个子线程将被 C++ 运行时库接管,当该线程执行结束的时候,由 C++ 运行时库负责回收该线程的资源。
struct A {
int& i;
A(int& x) :i(x) {}
void operator()()const {
for (int i = 0; i < 100000; i++) {
//@ dosomething(i);
}
}
};
void f()
{
int x = 0;
A a(x);
std::thread t(a);
t.detach(); //@ 不等待t结束
//@ 函数运行结束后线程t可能还在执行,i是x的引用,但是此时x被销毁了
}
int main()
{
std::thread t(f); //@ 导致空悬引用
}
join¶
- join 之后,当前线程会一直阻塞,直到目标线程执行完成。
- join 之后,当子线程执行结束,主调线程将回收子调线程资源,并继续运行。
- 如果目标线程的任务非常耗时,就要考虑好是否需要在主线程上等待它了,因为这很可能会导致主线程卡住。
void f()
{
int x = 0;
A a(x);
std::thread t(a);
t.join(); //@ 等待t结束
}
joinable¶
- joinable 可以用来判断这个线程当前是否可以被 join。
- join 之后不能再被重复 join,反复 join将出错。
- 使用
detach
分离线程会让线程在后台运行,线程分离后与主线程无法直接交互,也不能被
join 如果
detach之后再 join 将出错。 - 如果线程运行过程中发生异常,之后调用的 join 会被忽略,为此需要捕获异常并在处理异常时调用 join 。
void f()
{
int x = 0;
A a(x);
std::thread t(a);
try
{
doSomethingHere();
}
catch(...)
{
t.join();
throw;
}
t.join();
}
thread_guard 类¶
用 RAII类来管理 std::thread:
class thread_guard {
std::thread& t;
public:
explicit thread_guard(std::thread& x) :t(x) {}
~thread_guard() { if (t.joinable()) t.join(); }
thread_guard(const thread_guard&) = delete;
thread_guard& operator= (const thread_guard&) = delete;
};
当 thread 对象 t 被销毁时将会调用 t.join():
void f()
{
int x = 0;
A a(x);
std::thread t(a);
thread_guard g(t);
//@ doSomethingHere();
//@ 局部对象逆序销毁,优先销毁thread_guard对象,从而调用t.join()
}
为线程传递参数¶
- 有参数的函数也能传给 std::thread,参数的默认实参会被忽略。
void f(int i = 1) //@ 传递给 std::thread 时默认实参会被忽略
{
std::cout << i<<"\n";
}
int main()
{
std::thread t(f,2); //@ 第一个参数为函数名,其余参数为函数的参数
t.join();
}
- std::thread 会无视参数的引用类型,因此需要使用 std::ref 来生成一个引用包裹对象以传入引用类型
void f(int& n) { ++n; }
int main()
{
int i = 1;
std::thread t(f, std::ref(i));
t.join();
std::cout << i << "\n"; //@ 2
}
- 也可以传递类成员函数
public:
void f(int i) { std::cout << i <<"\n"; }
};
int main()
{
A a;
std::thread t(&A::f, &a, 42); //@ 第一个参数为成员函数地址,第二个参数为实例地址,之后为哈数参数
t.join();
}
- 如果参数是move-only对象则需要使用 std::move
void f(std::unique_ptr<int> p)
{
std::cout << *p << "\n";
}
int main()
{
std::unique_ptr<int> p(new int(42));
std::thread t(f, std::move(p));
t.join();
}
转移线程所有权¶
- 一个线程不能重复被关联。
std::thread t1(f);
std::thread t2 = std::move(t1); //@ t1所有权给t2,t2关联执行f的线程
t1 = std::thread(g); //@ t1重新关联一个执行g的线程
std::thread t3;
t3 = std::move(t2); //@ t3关联t2的线程,t2无关联
t1 = std::move(t3); //@ t1已有关联g的线程,调用std::terminate终止程序
- 线程所有权可以转移到函数外
void f(int i) { std::cout << i << "\n"; }
std::thread g()
{
return std::thread(f, 42);
}
int main()
{
std::thread t{ g() };
t.join();
}
- std::thread 也能作为参数
void f(std::thread t);
void g()
{
f(std::thread(someFunction));
std::thread t(someFunction);
f(std::move(t));
}
- 移动操作同样适用于支持移动的容器
void someFunction() {
std::cout << "do something" << "\n";
}
void f()
{
std::vector<std::thread> v;
for (int i = 0; i < 10; ++i)
{
v.emplace_back(someFunction);
}
std::for_each(std::begin(v), std::end(v), std::mem_fn(&std::thread::join));
}
int main()
{
f();
}
scoped_thread 类¶
直接用 std::thread 构造的RAII类:
class scoped_thread {
std::thread t;
public:
explicit scoped_thread(std::thread x) : t(std::move(x))
{
if (!t.joinable())
{
throw std::logic_error("no thread");
}
}
~scoped_thread() { t.join(); }
scoped_thread(const scoped_thread&) = delete;
scoped_thread& operator=(const scoped_thread&) = delete;
};
struct A { ... };
void f()
{
int x = 0;
scoped_thread g(std::thread{A(x)});
doSomethingHere();
} //@ scoped_thread对象销毁将自动调用join
运行期选择线程数量¶
hardware_concurrency 会返回支持的并发线程数。
多核系统中,返回值可以是 CPU 核芯的数量。返回值也仅仅是一个提示,当系统信息无法获取时,函数也会返回0。
std::cout << std::thread::hardware_concurrency() << "\n";
并行版本的 std::accumulate :
template<typename Iterator, typename T>
struct accumulate_block {
void operator()(Iterator first, Iterator last, T& res)
{
res = std::accumulate(first, last, res);
}
};
template<typename Iterator, typename T>
T parallel_accumulate(Iterator first, Iterator last, T init)
{
const unsigned long len = std::distance(first, last);
if (!len) return init;
const unsigned long min_per_thread = 25;
const unsigned long max_threads = (len + min_per_thread - 1) / min_per_thread;
const unsigned long hardware_threads = std::thread::hardware_concurrency();
//@ 设置线程数量,设置2只是一种选择
const unsigned long num_threads = std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
const unsigned long block_size = len / num_threads; //@ 每个线程中的数据量
std::vector<T> res(num_threads);
std::vector<std::thread> threads(num_threads - 1); //@ 已有一个主线程,所以少一个线程
Iterator block_start = first;
for (unsigned long i = 0; i < num_threads - 1; ++i)
{
Iterator block_end = block_start;
std::advance(block_end, block_size); //@ block_end指向当前块的尾部
threads[i] = std::thread(accumulate_block<Iterator, T>{},
block_start, block_end, std::ref(res[i]));
block_start = block_end;
}
accumulate_block<Iterator, T>()(block_start, last, res[num_threads - 1]);
std::for_each(threads.begin(), threads.end(), std::mem_fn(&std::thread::join));
return std::accumulate(res.begin(), res.end(), init);
}
需要注意的是线程的上下文切换也是比较耗时的,为了避免不必要的开销,需要根据硬件支持的并发数量,输入数据的数量大小等因素确定并发的线程数量。
线程标识¶
- 可以通过对线程实例调用成员函数 get_id 或在当前线程中调用 std::this_thread::get_id 获取 线程id 。
- 线程
id允许拷贝和比较,因此可以将其作为容器的键值。如果两个线程id相等,则两者是同一线程或都无线程。
std::thread::id masterThread; //@ 主线程
void f()
{
if (std::this_thread::get_id() == masterThread)
{ //@ 主线程要做一些额外工作,即可通过比较线程id来确认主线程
doMasterThreadWork();
}
doCommonWork();
}