并行算法

执行策略

C++17 对标准库算法重载了并行版本,区别是多了一个指定执行策略的参数。

std::vector<int> v;
std::sort(std::execution::par, v.begin(), v.end());

std::execution::par 表示允许多线程并行执行此算法,注意这是一个权限而非强制要求,此算法依然可以被单线程执行

另外,如果指定了执行策略,算法复杂度的要求也更宽松,因为并行算法为了利用好系统的并行性通常要做更多工作。比如把工作划分给100个处理器,即使总工作是原来的两倍,也仍然能获得原来的五十倍的性能。

执行策略类:

std::execution::sequenced_policy
std::execution::parallel_policy
std::execution::parallel_unsequenced_policy
std::execution::unsequenced_policy //@ C++20

并指定了对应的全局对象:

std::execution::seq
std::execution::par
std::execution::par_unseq
std::execution::unseq //@ C++20

如果使用执行策略,算法的行为就会受执行策略影响,影响方面包括:算法复杂度、抛异常时的行为、算法步骤的执行位置(where)、方式(how)、时刻(when)。

除了管理并行执行的调度开销,许多并行算法会执行更多的核心操作(交换、比较、使用函数对象等),这样可以减少总的实际消耗时间,从而全面提升性能。这就是算法复杂度受影响的原因,其具体改变因算法不同而异。

在不指定执行策略时,如下对算法的调用,抛出的异常会被传播:

std::for_each(v.begin(), v.end(), [](auto x){ throw my_exception(); });

而指定执行策略时,如果算法执行期间抛出异常,则行为结果由执行策略决定。如果有任何未捕获的异常,执行策略将调用 std::terminate 终止程序,唯一可能抛出异常的情况是,内部操作不能获取足够的内存资源时抛出 std::bad_alloc。如下操作将调用 std::terminate 终止程序。

std::for_each(std::execution::seq, v.begin(), v.end(), [](auto x){ throw my_exception(); });

不同的执行策略的执行方式也不相同。执行策略会指定执行算法步骤的代理,可以是常规线程、矢量流、GPU 线程或其他任何东西。执行策略也会指定算法步骤运行的顺序限制,比如是否要以特定顺序运行、不同算法步骤的一部分是否可以互相交错或并行运行等。下面对不同的执行策略进行详细解释。

std::execution::sequenced_policy

std::execution::sequenced_policy 策略要求可以不并行执行,所有操作将执行在一个线程上。但它也是执行策略,因此与其他执行策略一样会影响算法复杂度和异常行为。

所有执行在一个线程上的操作必须以某个确定顺序执行,因此这些操作是不能互相交错的。但不规定具体顺序,因此对于不同的函数调用可能产生不同的顺序。

std::vector<int> v(1000);
int n = 0;
//@ 下面把1~1000存入容器,但存入顺序是未指定的(可能顺序也可能乱序)
std::for_each(std::execution::seq, v.begin(), v.end(), [&](int& x) { x = ++n; });

因此 std::execution::sequenced_policy 策略很少要求算法使用迭代器、值、可调用对象,它们可以自由地使用同步机制,可以依赖于同一线程上调用的操作,尽管不能依赖于这些操作的顺序。

std::execution::parallel_policy

std::execution::parallel_policy 策略提供了基本的跨多个线程的并行执行,操作可以执行在调用算法的线程上,或执行在由库创建的线程上,在一个给定线程上的操作必须以确定顺序执行,并且不能相互交错。同样这个顺序是未指定的,对于不同的调用可能会有不同的顺序。一个给定的操作将在一个固定的线程上运行完整个周期。

因此 std::execution::parallel_policy 策略对于迭代器、值、可调用对象的使用就有一定要求,它们在并行调用时不能造成数据竞争,并且不能依赖于统一线程上的其他操作,或者说只能依赖于不运行在同一线程上的其他操作。

大多数情况都可以使用 std::execution::parallel_policy 策略。

std::for_each(std::execution::par, v.begin(), v.end(), [](auto& x){ ++x; });

只有在元素之间有特定顺序或对共享数据的访问不同步时,它才有问题:

std::vector<int> v(1000);
int n = 0;
std::for_each(std::execution::par, v.begin(), v.end(),
    [&](int& x) { x = ++n; }); //@ 如果多个线程执行lambda就会对n产生数据竞争

因此使用 std::execution::parallel_policy 策略时,应该事先考虑可能出现的未定义行为。可以用 mutex 或原子变量来解决竞争问题,但这就影响了并发性。不过这个例子只是为了阐述此情况,一般使用 std::execution::parallel_policy 策略时都是允许同步访问共享数据的。

std::execution::parallel_unsequenced_policy

std::execution::parallel_unsequenced_policy 策略提供了最大可能的并行化,代价是对算法使用的迭代器、值和可调用对象有最严格的的要求。

使用 std::execution::parallel_unsequenced_policy 策略的算法允许以无序的方式在任意未指定的线程中执行,并且在每个线程中彼此不排序。也就是说,操作可以在单个线程上互相交错,同一线程上的第二个操作可以开始于第一个操作结束前,并且可以在线程间迁移,一个给定的操作可以开始于一个线程,运行于另一线程,而完成于第三个线程。

使用 std::execution::parallel_unsequenced_policy 策略时,提供给算法的迭代器、值、可调用对象上的操作不能使用任何形式的同步,也不能调用与其他代码同步的任何函数。这意味着操作只能作用于相关元素,或任何基于这些元素的可访问数据,并且不能修改任何线程间或元素间的共享数据。

标准库并行算法

<algorithm><numberic> 中的大部分算法都重载了并行版本。std::accumlate 没有并行版本,但 C++17 提供了std::reduce

auto res = std::accumulate(v.begin(), v.end(), 0);
auto res = std::reduce(std::execution::par, v.begin(), v.end());

如果常规算法有并行版的重载,则并行版对常规算法原有的所有重载都有一个对应重载版本:

template<class RandomIt>
void sort(RandomIt first, RandomIt last);

template<class RandomIt, class Compare>
void sort(RandomIt first, RandomIt last, Compare comp);

//@ 并行版对应有两个重载
template<class ExecutionPolicy, class RandomIt>
void sort(ExecutionPolicy&& policy, RandomIt first, RandomIt last);

template<class ExecutionPolicy, class RandomIt, class Compare>
void sort(ExecutionPolicy&& policy, RandomIt first, RandomIt last, Compare comp);

但并行版的重载对部分算法有一些区别,如果常规版本使用的是输入迭代器或输出迭代器,则并行版的重载将使用前向迭代器。

template<class InputIt, class OutputIt>
OutputIt copy(InputIt first, InputIt last, OutputIt d_first);

template<class ExecutionPolicy, class ForwardIt1, class ForwardIt2>
ForwardIt2 copy(ExecutionPolicy&& policy, ForwardIt1 first, ForwardIt1 last, ForwardIt2 d_first);

输入迭代器只能用来读取指向的值,迭代器自增后就再也无法访问之前指向的值,它一般用于从控制台或网络输入,或生成序列,比如 std::istream_iterator。同理,输出迭代器一般用来输出到文件,或添加值到容器,也是单向的,比如 std::ostream_iterator

前向迭代器返回元素的引用,因此可以用于读写,它同样只能单向传递,std::forward_list 的迭代器就是前向迭代器,虽然它不可以回到之前指向的值,但可以存储一个指向之前元素的拷贝(比如 std::forward_list::begin)来重复利用。对于并行性来说,可以重复利用迭代器很重要。此外,前向迭代器的自增不会使其他的迭代器拷贝失效,这样就不用担心其他线程中的迭代器受影响。如果使用输入迭代器,所有线程只能共用一个迭代器,显然无法并行。

std::execution::par 是最常用的策略,除非实现提供了更符合需求的非标准策略。一些情况下也可以使用 std::execution::par_unseq,虽然这不保证更好的并发性,但它给了库通过重排和交错任务来提升性能的可能性,不过代价就是不能使用同步机制,要确保线程安全只能让算法本身不会让多个线程访问同一元素,并在调用该算法的外部使用同步机制来避免其他线程对数据的访问。

//@ 内部带同步机制只能使用std::execution::par,使用std::execution::par_unseq则会出现未定义行为
class A {
    mutable std::mutex m;
    int n = 0;
public:
    int getVal() const
    {
        std::scoped_lock l(m);
        return n;
    }
    void inc()
    {
        std::scoped_lock l(m);
        ++n;
    }
};

void f(std::vector<A>& v)
{
    std::for_each(std::execution::par, v.begin(), v.end(), [](A& x) { x.inc(); });
}

//@ 如果要使用std::execution::par_unseq,则内部不能使用同步机制,同步机制应该在外部使用
class A {
    int n = 0;
public:
    int getVal() const { return n; }
    void inc() { ++n; }
};

class B {
    std::mutex m;
    std::vector<A> v;
public:
    void lock() { m.lock(); }
    void unlock() { m.unlock(); }
    std::vector<A>& getVec() { return v; }
};

void f(B& x)
{
    std::scoped_lock l(x);
    auto& v = x.getVec();
    std::for_each(std::execution::par_unseq, v.begin(), v.end(), [](A& x) { x.inc(); });
}

下面是一个更实际的例子。假如有一个网站,访问日志有上百万条,为了方便查看数据需要对日志进行处理。对日志每行的处理是独立的工作,很适合使用并行算法。

struct log {
    std::string page;
    time_t visit_time;
    //@ any other fields
};

extern log parse(const std::string& line);

using Map = std::unordered_map<std::string, unsigned long long>;

Map f(const std::vector<std::string>& v)
{
    struct combine {
        //@ log、Map两个参数有四种组合,所以需要四个重载
        Map operator()(Map lhs, Map rhs) const
        {
            if (lhs.size() < rhs.size()) std::swap(lhs, rhs);
            for (const auto& x : rhs) lhs[x.first] += x.second;
            return lhs;
        }
        Map operator()(log l, Map m) const
        {
            ++m[l.page];
            return m;
        }
        Map operator()(Map m, log l) const
        {
            ++m[l.page];
            return m;
        }
        Map operator()(log lhs, log rhs) const
        {
            Map m;
            ++m[lhs.page];
            ++m[rhs.page];
            return m;
        }
    };
    return std::transform_reduce(std::execution::par, v.begin(), v.end(),
        Map{}, //@ 初始值,一个空的map
        combine{}, //@ 结合两个元素的二元操作
        parse); //@ 对每个元素执行的一元操作
}