D语言并发与多线程
去年我发布了一篇简单解析了 D 语言中如何并行执行任务的博客,用于记录我当前的需求。不过我一直打算做一篇简单的详解来解释 D 语言中各种并发方式与其意义。
并行
采取并行工作,需要导入 std.parallelism
。
并行通常采用 Task 的形式分发给新的线程。parallelism 中存在一个任务队列 taskPool,创建的新任务可以加入这个任务队列集中管理。
Task
类 TaskPool
TaskPool
是一个集中管理任务的队列,在 parallelism
中具备一个单例实现的 taskPool
成员。
加入或通过 taskPool
创建的成员任务将会被排程执行。
函数 task
task
是在 GC 的堆栈中创建一个新的并发任务 Task。接受一个委托(delegate)
或函数(function)
,以及对应的参数。
task
创建的 Task 默认通过 executeInNewThread
执行,而不是通过 taskPool
排程。
import std.file;
import std.parallelism;
void main() {
auto fileTask = task!read("myFile.txt");
auto fileData = fileTask.yieldForce();
}
函数 scopedTask
与 task
相似,scopedTask
也会创建一个新的并发任务 Task。但是它不会在 GC 堆里创建,而是在当前堆栈中创建。其生命周期不会超过创建它的作用域堆栈的生命周期。
通常建议在作用域函数结束前调用 Task#yieldForce
确保 Task 能够在作用域所在的堆栈被销毁之前结束。
import std.file;
import std.parallelism;
void main() {
auto fileTask = scopedTask!read("myFile.txt");
auto fileData = fileTask.yieldForce();
}
成员方法 TaskPool#parallel
通常通过单例 taskPool
创建,parallel
允许通过 foreach 迭代执行,并通过 workUnitSize
限定其最大线程数量。对于那些需要大量的排程执行的任务有着极佳的好处——这意味着排程的任务能够依次被调用而避免竞争共享。如果没有指定 workUnitSize
,那么将会采用默认值。
parallel
会隐式的创建新的管道执行任务,对于较为简单的工作可以提高更多的工作线程,但对于较为复杂的任务,最好保持采用 1 作为最大线程数量。
当然,parallel
通常在一个 foreach
中执行,但这并不是真实的 foreach
语句。任何跳出 foreach
的方式,比如 break
或者 return
,亦或是 goto
,都会抛出 ParallelForeachError
。
值得一提的是,如果并行期间发生了未能及时回收的错误,其他的任务会照常执行,最后通过 Throwable#next
进行抛出。不过抛出顺序是不确定的。
import std.parallelism;
void main() {
auto data = new double[1000];
foreach(i, ref elem; taskPool.parallel(data, 10)) {
elem = i + 1;
}
}
成员方法 TaskPool#amap
amap
通常是随机执行的,并返回处理后的队列。如果未指定返回结果的变量,结果将会尽快被回收。
与 parallel
相同,workUnitSize
会决定最大线程量。
相较于 map
,amap
有着更好的访问性能。不过 amap
要求写入与访问都是线程安全的。
import std.parallelism;
void main() {
double[] data = [0, 1, 2];
double pow(double a) = a * a;
data = taskPool.amap!pow(data, 10);
}
成员方法 TaskPool#map
惰性访问管道 map
将会有序的处理每一个元素,随后写入到缓冲区。因此,通常在 map
中写入无需每次都进行原子或同步。对于需要有序访问的情况下采用 map
而非 amap
。
map
采用 bufSize
作为缓冲区大小值,以及 workUnitSize
作为最大线程。通常情况下,如果未指定缓冲区大小, map
会自己评估。
import std.parallelism;
void main() {
double[] data = [0, 1, 2];
double pow(double a) = a * a;
data = taskPool.map!pow(data);
}
成员方法 TaskPool#asyncBuf
对于那些昂贵的,费时的操作,通过 asnycBuf
缓冲执行限定缓冲区的执行时非常有用。
import std.parallelism;
import std.conv, std.string;
void main() {
auto lines = File("foo.txt").byLine;
auto duped = taskPool.map!"a.idup"(lines);
string[] lineData;
auto asyncReader = taskPool.asyncBuf(duped);
foreach(line; asyncReader) {
auto ls = line.split("\n");
lineData ~= ls;
}
}
成员方法 TaskPool#reduce 与 TaskPool#fold
这两个方法其实本质上差不多,但是参数位存在一定差异。它们都用于数学逻辑运算。
两个方法都会取出一枚种子作为基数,随后对列表数据进行定向操作。看起来是这样的:
auto sum = taskPool.reduce!"a + b"(0, [1, 2, 3]);
在使用 reduce
时,第一位参数为种子。否则将会从列表的第一位,也就是 0 位取值作为种子。第三位则是 taskUnitSize
。
而 fold
则是反过来——第二位为种子。
如果提供了较多的模板,那么返回值将会是一个列表。刚才采用了 reduce
作为例子,那么现在采用 fold
:
auto dat = taskPool.fold!("a + b", "a * b")([1, 2, 3, 4], 0, 5);
assert(dat[0] == 10);
assert(dat[1] == 24);
信道通讯
去年的文章中已经完整的举例了信道通讯 spawn
必要的内容。
跳转 <
原子与共享
// TODO