线程管理

线程管理基础

启动线程

每个程序都至少有一个线程,即 main 函数所在的线程,称之为主线程。

将函数添加为 std::thread 的参数即可启动线程

#include <thread>
#include <iostream>

void f()
{
   std::cout << "hello world";
}

int main()
{
   std::thread t(f);
   t.join();
}

std::thread 的参数也可以是带 operator() 的对象实例或者 lambda

struct A {
    void operator()()const
    {
        std::cout << 1 <<"\n";
    }
};


int main()
{
    A a;
    std::thread t1(a);
    //@ std::thread t(A()); //@ most vexing parse,A()被视为函数声明
    //@ 解决 most vexing parse的方法:
    std::thread t2((A()));
    std::thread t3{ A() };

    //@ 使用 lambda
    std::thread t4([] {std::cout << 1 << "\n"; });

    t1.join();
    t2.join();
    t3.join();
    t4.join();

    return 0;
}

join 和 detach

启动线程后在线程销毁前要对其调用 joindetach,否则 std::thread 的析构函数会调用 std::terminate 终止程序。

detach

  • detach 是让目标线程成为守护线程(daemon threads)。
  • 一旦 detach ,目标线程将独立执行,即便其对应的 thread 对象销毁也不影响线程的执行。
  • 一旦 detach ,主调线程无法再取得该子线程的控制权。这个子线程将被 C++ 运行时库接管,当该线程执行结束的时候,由 C++ 运行时库负责回收该线程的资源。
struct A {
    int& i;
    A(int& x) :i(x) {}
    void operator()()const {
        for (int i = 0; i < 100000; i++) {
            //@ dosomething(i);
        }
    }
};

void f()
{
    int x = 0;
    A a(x);
    std::thread t(a);
    t.detach(); //@ 不等待t结束
    //@ 函数运行结束后线程t可能还在执行,i是x的引用,但是此时x被销毁了
}

int main()
{
    std::thread t(f);  //@ 导致空悬引用
}

join

  • join 之后,当前线程会一直阻塞,直到目标线程执行完成。
  • join 之后,当子线程执行结束,主调线程将回收子调线程资源,并继续运行。
  • 如果目标线程的任务非常耗时,就要考虑好是否需要在主线程上等待它了,因为这很可能会导致主线程卡住。
void f()
{
    int x = 0;
    A a(x);
    std::thread t(a);
    t.join();   //@ 等待t结束
}

joinable

  • joinable 可以用来判断这个线程当前是否可以被 join
  • join 之后不能再被重复 join,反复 join将出错。
  • 使用 detach 分离线程会让线程在后台运行,线程分离后与主线程无法直接交互,也不能被 join 如果 detach 之后再 join 将出错。
  • 如果线程运行过程中发生异常,之后调用的 join 会被忽略,为此需要捕获异常并在处理异常时调用 join
void f()
{
    int x = 0;
    A a(x);
    std::thread t(a);
    try
    {
        doSomethingHere();
    }
    catch(...)
    {
        t.join();
        throw;
    }
    t.join();
}

thread_guard 类

用 RAII类来管理 std::thread

class thread_guard {
    std::thread& t;
public:
    explicit thread_guard(std::thread& x) :t(x) {}
    ~thread_guard() { if (t.joinable()) t.join(); }
    thread_guard(const thread_guard&) = delete;
    thread_guard& operator= (const thread_guard&) = delete;
};

thread 对象 t 被销毁时将会调用 t.join()

void f()
{
    int x = 0;
    A a(x);
    std::thread t(a);
    thread_guard g(t);
    //@ doSomethingHere();
    //@ 局部对象逆序销毁,优先销毁thread_guard对象,从而调用t.join()
}

为线程传递参数

  • 有参数的函数也能传给 std::thread,参数的默认实参会被忽略。
void f(int i = 1) //@ 传递给 std::thread 时默认实参会被忽略
{
    std::cout << i<<"\n";
}

int main()
{
    std::thread t(f,2); //@ 第一个参数为函数名,其余参数为函数的参数
    t.join();
}
  • std::thread 会无视参数的引用类型,因此需要使用 std::ref 来生成一个引用包裹对象以传入引用类型
void f(int& n) { ++n; }

int main()
{
    int i = 1;
    std::thread t(f, std::ref(i));
    t.join();
    std::cout << i << "\n"; //@ 2
}
  • 也可以传递类成员函数
public:
    void f(int i) { std::cout << i <<"\n"; }
};

int main()
{
    A a;
    std::thread t(&A::f, &a, 42); //@ 第一个参数为成员函数地址,第二个参数为实例地址,之后为哈数参数
    t.join();
}
  • 如果参数是move-only对象则需要使用 std::move
void f(std::unique_ptr<int> p)
{
    std::cout << *p << "\n";
}

int main()
{
    std::unique_ptr<int> p(new int(42));
    std::thread t(f, std::move(p));
    t.join();
}

转移线程所有权

  • 一个线程不能重复被关联。
std::thread t1(f);
std::thread t2 = std::move(t1); //@ t1所有权给t2,t2关联执行f的线程
t1 = std::thread(g); //@ t1重新关联一个执行g的线程
std::thread t3;
t3 = std::move(t2); //@ t3关联t2的线程,t2无关联
t1 = std::move(t3); //@ t1已有关联g的线程,调用std::terminate终止程序
  • 线程所有权可以转移到函数外
void f(int i) { std::cout << i << "\n"; }

std::thread g()
{
    return std::thread(f, 42);
}

int main()
{
    std::thread t{ g() };
    t.join();
}
void f(std::thread t);

void g()
{
    f(std::thread(someFunction));
    std::thread t(someFunction);
    f(std::move(t));
}
  • 移动操作同样适用于支持移动的容器
void someFunction() {
    std::cout << "do something" << "\n";
}

void f()
{
    std::vector<std::thread> v;
    for (int i = 0; i < 10; ++i)
    {
        v.emplace_back(someFunction);
    }
    std::for_each(std::begin(v), std::end(v), std::mem_fn(&std::thread::join));
}


int main()
{
    f();
}

scoped_thread 类

直接用 std::thread 构造的RAII类:

class scoped_thread {
    std::thread t;
public:
    explicit scoped_thread(std::thread x) : t(std::move(x))
    {
        if (!t.joinable())
        {
            throw std::logic_error("no thread");
        }
    }
    ~scoped_thread() { t.join(); }
    scoped_thread(const scoped_thread&) = delete;
    scoped_thread& operator=(const scoped_thread&) = delete;
};

struct A { ... };

void f()
{
    int x = 0;
    scoped_thread g(std::thread{A(x)});
    doSomethingHere();
} //@ scoped_thread对象销毁将自动调用join

运行期选择线程数量

hardware_concurrency 会返回支持的并发线程数。

多核系统中,返回值可以是 CPU 核芯的数量。返回值也仅仅是一个提示,当系统信息无法获取时,函数也会返回0。

std::cout << std::thread::hardware_concurrency() << "\n";

并行版本的 std::accumulate

template<typename Iterator, typename T>
struct accumulate_block {
    void operator()(Iterator first, Iterator last, T& res)
    {
        res = std::accumulate(first, last, res);
    }
};

template<typename Iterator, typename T>
T parallel_accumulate(Iterator first, Iterator last, T init)
{
    const unsigned long len = std::distance(first, last);
    if (!len) return init;
    const unsigned long min_per_thread = 25;
    const unsigned long max_threads = (len + min_per_thread - 1) / min_per_thread;
    const unsigned long hardware_threads = std::thread::hardware_concurrency();
    //@ 设置线程数量,设置2只是一种选择
    const unsigned long num_threads = std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
    const unsigned long block_size = len / num_threads; //@ 每个线程中的数据量
    std::vector<T> res(num_threads);
    std::vector<std::thread> threads(num_threads - 1); //@ 已有一个主线程,所以少一个线程
    Iterator block_start = first;
    for (unsigned long i = 0; i < num_threads - 1; ++i)
    {
        Iterator block_end = block_start;
        std::advance(block_end, block_size); //@ block_end指向当前块的尾部
        threads[i] = std::thread(accumulate_block<Iterator, T>{},
            block_start, block_end, std::ref(res[i]));
        block_start = block_end;
    }
    accumulate_block<Iterator, T>()(block_start, last, res[num_threads - 1]);
    std::for_each(threads.begin(), threads.end(), std::mem_fn(&std::thread::join));
    return std::accumulate(res.begin(), res.end(), init);
}

需要注意的是线程的上下文切换也是比较耗时的,为了避免不必要的开销,需要根据硬件支持的并发数量,输入数据的数量大小等因素确定并发的线程数量。

线程标识

  • 可以通过对线程实例调用成员函数 get_id 或在当前线程中调用 std::this_thread::get_id 获取 线程id
  • 线程 id 允许拷贝和比较,因此可以将其作为容器的键值。如果两个线程 id 相等,则两者是同一线程或都无线程。
std::thread::id masterThread; //@ 主线程

void f()
{
    if (std::this_thread::get_id() == masterThread)
    { //@ 主线程要做一些额外工作,即可通过比较线程id来确认主线程
        doMasterThreadWork();
    }
    doCommonWork();
}