去年我发布了一篇简单解析了 D 语言中如何并行执行任务的博客,用于记录我当前的需求。不过我一直打算做一篇简单的详解来解释 D 语言中各种并发方式与其意义。

并行

采取并行工作,需要导入 std.parallelism
并行通常采用 Task 的形式分发给新的线程。parallelism 中存在一个任务队列 taskPool,创建的新任务可以加入这个任务队列集中管理。

Task

类 TaskPool

TaskPool 是一个集中管理任务的队列,在 parallelism 中具备一个单例实现的 taskPool 成员。
加入或通过 taskPool 创建的成员任务将会被排程执行。

函数 task

task 是在 GC 的堆栈中创建一个新的并发任务 Task。接受一个委托(delegate)函数(function),以及对应的参数。
task 创建的 Task 默认通过 executeInNewThread 执行,而不是通过 taskPool 排程。

import std.file;
import std.parallelism;

void main() {
  auto fileTask = task!read("myFile.txt");
  auto fileData = fileTask.yieldForce();
}

函数 scopedTask

task 相似,scopedTask 也会创建一个新的并发任务 Task。但是它不会在 GC 堆里创建,而是在当前堆栈中创建。其生命周期不会超过创建它的作用域堆栈的生命周期。 通常建议在作用域函数结束前调用 Task#yieldForce 确保 Task 能够在作用域所在的堆栈被销毁之前结束。

import std.file;
import std.parallelism;

void main() {
  auto fileTask = scopedTask!read("myFile.txt");
  auto fileData = fileTask.yieldForce();
}

成员方法 TaskPool#parallel

通常通过单例 taskPool 创建,parallel 允许通过 foreach 迭代执行,并通过 workUnitSize 限定其最大线程数量。对于那些需要大量的排程执行的任务有着极佳的好处——这意味着排程的任务能够依次被调用而避免竞争共享。如果没有指定 workUnitSize,那么将会采用默认值。
parallel 会隐式的创建新的管道执行任务,对于较为简单的工作可以提高更多的工作线程,但对于较为复杂的任务,最好保持采用 1 作为最大线程数量。 当然,parallel 通常在一个 foreach 中执行,但这并不是真实的 foreach 语句。任何跳出 foreach 的方式,比如 break 或者 return,亦或是 goto,都会抛出 ParallelForeachError。 值得一提的是,如果并行期间发生了未能及时回收的错误,其他的任务会照常执行,最后通过 Throwable#next 进行抛出。不过抛出顺序是不确定的。

import std.parallelism;

void main() {
    auto data = new double[1000];

    foreach(i, ref elem; taskPool.parallel(data, 10)) {
      elem = i + 1;
    }
}

成员方法 TaskPool#amap

amap 通常是随机执行的,并返回处理后的队列。如果未指定返回结果的变量,结果将会尽快被回收。
parallel 相同,workUnitSize 会决定最大线程量。
相较于 mapamap 有着更好的访问性能。不过 amap 要求写入与访问都是线程安全的。

import std.parallelism;

void main() {
    double[] data = [0, 1, 2];
    double pow(double a) = a * a;

    data = taskPool.amap!pow(data, 10);
}

成员方法 TaskPool#map

惰性访问管道 map 将会有序的处理每一个元素,随后写入到缓冲区。因此,通常在 map 中写入无需每次都进行原子或同步。对于需要有序访问的情况下采用 map 而非 amap
map 采用 bufSize 作为缓冲区大小值,以及 workUnitSize 作为最大线程。通常情况下,如果未指定缓冲区大小, map 会自己评估。

import std.parallelism;

void main() {
    double[] data = [0, 1, 2];
    double pow(double a) = a * a;

    data = taskPool.map!pow(data);
}

成员方法 TaskPool#asyncBuf

对于那些昂贵的,费时的操作,通过 asnycBuf 缓冲执行限定缓冲区的执行时非常有用。

import std.parallelism;
import std.conv, std.string;

void main() {
  auto lines = File("foo.txt").byLine;
  auto duped = taskPool.map!"a.idup"(lines);

  string[] lineData;
  auto asyncReader = taskPool.asyncBuf(duped);
  foreach(line; asyncReader) {
    auto ls = line.split("\n");
    lineData ~= ls;
  }
}

成员方法 TaskPool#reduce 与 TaskPool#fold

这两个方法其实本质上差不多,但是参数位存在一定差异。它们都用于数学逻辑运算。
两个方法都会取出一枚种子作为基数,随后对列表数据进行定向操作。看起来是这样的:

auto sum = taskPool.reduce!"a + b"(0, [1, 2, 3]);

在使用 reduce 时,第一位参数为种子。否则将会从列表的第一位,也就是 0 位取值作为种子。第三位则是 taskUnitSize
fold 则是反过来——第二位为种子。
如果提供了较多的模板,那么返回值将会是一个列表。刚才采用了 reduce 作为例子,那么现在采用 fold

auto dat = taskPool.fold!("a + b", "a * b")([1, 2, 3, 4], 0, 5);
assert(dat[0] == 10);
assert(dat[1] == 24);

信道通讯

去年的文章中已经完整的举例了信道通讯 spawn 必要的内容。

跳转 <

原子与共享

// TODO