並列処理

最近のほとんどのマイクロプロセッサは、複数のコアで構成されており、各コアは個別の処理ユニットとして動作する。これらのコアは、異なるプログラムの異なる部分を同時に実行することができる。std.parallelismモジュールの機能により、プログラムはすべてのコアを活用してより高速に実行することができる。

この章では、以下の範囲アルゴリズムについて説明する。これらのアルゴリズムは、並列に実行される操作が互いに完全に独立している場合にのみ使用すべきだ。並列とは、複数のコアで同時に操作が実行されることを意味する。

これまで作成したプログラムでは、プログラムの式は、少なくとも一般的には1行ずつ、特定の順序で実行されると仮定してきた。

++i;
++j;
D

上記のコードでは、jの値が加算される前に、iの値が加算されることを期待している。これは意味的には正しいが、実際にはほとんどない。マイクロプロセッサやコンパイラは、最適化技術を使用して、互いに依存しない変数をマイクロプロセッサのレジスタに格納する。その場合、マイクロプロセッサは、上記の加算のような操作を並行して実行する。

これらの最適化は効果的だが、非常に低レベルの操作よりも上位のレイヤーには自動的に適用できない。特定のハイレベル操作が独立しており、並列に実行できるかどうかは、プログラマーのみが判断できる。

ループでは、範囲内の要素は通常、一つずつ順番に処理され、各要素の操作は前の要素の操作に続いて実行される:

auto students =
    [ Student(1), Student(2), Student(3), Student(4) ];

foreach (student; students) {
    student.aSlowOperation();
}
D

通常、プログラムは、オペレーティングシステムによってプログラムの実行に割り当てられたマイクロプロセッサのコアの1つで実行される。foreachループは通常、要素を順番に処理するため、aSlowOperation()は各学生に対して順番に呼び出される。しかし、多くの場合、後続の学生の操作を開始する前に、先行する学生の操作が完了している必要はない。Studentオブジェクトの操作が本当に独立している場合は、システム上でアイドル状態で待機している可能性のある他のマイクロプロセッサコアを無視することは無駄だ。

長時間の操作をシミュレートするために、次の例では、core.threadモジュールからThread.sleep()を呼び出している。Thread.sleep()は、指定した時間だけ操作を中断する。Thread.sleepは、コアを一切使用せずに時間を消費するため、次の例で使用する手法としては明らかに不自然だ。現実的ではない手法だが、この章では、並列処理の威力を示すために有用だ。

import std.stdio;
import core.thread;

struct Student {
    int number;

    void aSlowOperation() {
        writefln("The work on student %s has begun", number);

        // 長期の操作をシミュレートするためにしばらく待つ
        Thread.sleep(1.seconds);

        writefln("The work on student %s has ended", number);
    }
}

void main() {
    auto students =
        [ Student(1), Student(2), Student(3), Student(4) ];

    foreach (student; students) {
        student.aSlowOperation();
    }
}
D
parallelism.1

プログラムの実行時間は、ターミナルでtimeと入力して測定できる。

time ./deneme
生徒1の作業が開始されました
生徒1の作業が終了しました
生徒2の作業が開始されました
生徒2の作業が終了しました
生徒3の作業が開始されました
生徒3の作業が終了しました
生徒4の作業が開始されました
生徒4の作業が終了しました

real    0m4.005s    ← 合計4秒
user    0m0.004s
sys     0m0.000s
Bash

学生は順に反復処理され、各学生の作業に1秒かかるため、総実行時間は4秒になる。しかし、これらの操作が4つのコアを持つ環境で実行された場合、同時に処理され、総時間は約1秒に短縮される。

その方法を見る前に、まず、std.parallelism.totalCPUsで、システムで使用可能なコアの数を決定しよう。

import std.stdio;
import std.parallelism;

void main() {
    writefln("There are %s cores on this system.", totalCPUs);
}
D
parallelism.2

この章で記述されている環境でのプログラムの出力は次の通りだ。

このシステムには4つのコアがある。
taskPool.parallel()

この関数は、parallel()と単純に呼び出すこともできる。

parallel()は、範囲内の要素に並行してアクセスする。この関数は、foreachループで効果的に使用できる。std.parallelismモジュールをインポートし、上記のプログラムでstudentsparallel(students)に置き換えるだけで、システムのすべてのコアを活用することができる。

import std.parallelism;
// ...
    foreach (student; parallel(students)) {
D

構造体およびクラス用のforeach foreachブロック内の式はopApply()メンバー関数にデリゲートとして渡されることを、すでに説明した。parallel()は、delegateの実行を各要素の個別のコアに分散する方法を知っている範囲オブジェクトを返す。

その結果、Student範囲をparallel()経由で渡すと、4 コアのシステムでは上記のプログラムは 1 秒で終了する。

time ./deneme
生徒2の作業が開始
生徒1の作業が開始
生徒4の作業が開始
生徒3の作業が開始
生徒1の作業が終了
生徒2の作業が終了
生徒4の作業が終了
生徒3の作業が終了

real    0m1.005s    ← 今だけ1秒
user    0m0.004s
sys     0m0.004s
Bash

注釈:プログラムの実行時間は、システムによって異なる場合があるが、おおよそ"4 秒÷コア数"になると予想される。

プログラムの特定の部分を実行する流れを、実行スレッドまたは スレッドと呼ぶ。プログラムは、同時にアクティブに実行されている複数のスレッドで構成される場合がある。オペレーティングシステムは、各スレッドをコアで起動して実行し、他のスレッドを実行するためにそのスレッドを一時停止する。各スレッドの実行には、起動と一時停止が何度も繰り返される場合がある。

ある時点でアクティブなすべてのプログラムのスレッドは、マイクロプロセッサのコア上で実行される。オペレーティングシステムは、各スレッドをいつ、どのような条件で開始および一時停止するかを決定する。そのため、aSlowOperation()によって出力されるメッセージは、上記の出力では順番が混在している。Studentオブジェクトの操作が互いに完全に独立している場合は、スレッドの実行順序が確定しないことは問題にならないかもしれない。

各要素に適用される操作が各反復で独立している場合にのみ、parallel()を呼び出すことは、プログラマーの責任だ。例えば、出力にメッセージが特定の順序で表示されることが重要な場合、上記のプログラムではparallel()を呼び出すことはエラーとみなすべきだ。他のスレッドに依存するスレッドをサポートするプログラミングモデルは、並行処理と呼ばれる。並行処理は、次の章で扱うトピックだ。

並列foreachが終了するまでに、ループ内のすべての操作がすべての要素に対して完了している。foreachループの後は、プログラムを安全に続行できる。

作業単位のサイズ

parallel()の2番目のパラメーターはオーバーロードされており、一部のケースでは無視される:

/* ... */ = parallel(range, work_unit_size = 100);
D
Task

プログラムの他の操作と並行して実行される操作は、タスクと呼ばれる。タスクは、std.parallelism.Task型で表される。

実際、parallel()は各ワーカースレッドごとに新しいTaskオブジェクトを構築し、そのタスクを自動的に開始する。parallel()は、すべてのタスクが完了するまで待機し、最後にループから退出する。parallel()は、タスクの 構築開始待機を自動的に行うため、非常に便利だ。

タスクが範囲の要素に対応しない、または範囲の要素で表現できない場合、この3つのステップはプログラマが明示的に処理することができる。task()はタスクオブジェクトを構築し、executeInNewThread()はタスクオブジェクトを開始し、yieldForce()はタスクオブジェクトを待機する。この3つの関数の詳細については、次のプログラムのコメントで説明する。

次のプログラムでは、anOperation()関数が2回起動される。idの最初の文字を出力して、どのタスクを処理しているかを示す。

注釈:通常、stdoutのような出力ストリームに出力された文字は、すぐには出力には表示されない。その代わりに、1行の出力が入力されるまで出力バッファに格納される。writeは改行文字を出力しないため、次のプログラムの並列実行を確認するには、行の終わりに達する前にstdout.flush()を呼び出して、バッファの内容をstdoutに送信する必要がある。

import std.stdio;
import std.parallelism;
import std.array;
import core.thread;

/* 0.5秒ごとに'id'の最初の文字を出力する。
 * 計算を行う関数をシミュレートするために、
 * 任意に値1を返す。この結果は後でmainで使用される。 */
int anOperation(string id, int duration) {
    writefln("%s will take %s seconds", id, duration);

    foreach (i; 0 .. (duration * 2)) {
        Thread.sleep(500.msecs);  /* 0.5秒 */
        write(id.front);
        stdout.flush();
    }

    return 1;
}

void main() {
    /* anOperation()を実行するタスクオブジェクトを構築する。
     * ここで指定する関数パラメータは、
     * タスク関数の関数パラメータとして
     * 渡される。 */
    auto theTask = task!anOperation("theTask", 5);

    /* タスクオブジェクトを開始する */
    theTask.executeInNewThread();

    /* 'theTask'の実行が継続すると、'anOperation()'が
     * 再び呼び出される。今回はmain内で直接呼び出される。 */
    immutable result = anOperation("main's call", 3);

    /* この時点で、mainから直接開始された操作は
     * 完了していることは確実だ。
     * これは、この操作はタスクとしてではなく、通常の
     * 関数呼び出しによって開始されたためだ。 */

    /* 一方、この時点では、'theTask'がその操作を
     * 完了したかどうかはまだ
     * 定かではない。yieldForce()は、タスクが
     * その操作を完了するまで待機し、タスクが完了した場合に
     * のみ戻ってくる。その戻り値は、
     * タスク関数、つまりanOperation()の戻り値である。 */
    immutable taskResult = theTask.yieldForce();

    writeln();
    writefln("All finished; the result is %s.",
             result + taskResult);
}
D
parallelism.3

プログラムの出力は次のようになるはずだ。mtの文字が混在して表示されていることは、操作が並列に実行されていることを示している:

mainの呼び出しには3秒かかる
theTaskには5秒かかる
mtmttmmttmmttttt
すべて完了; 結果は2だ。

上記のタスク関数は、task()のテンプレートパラメータとしてtask!anOperationと指定されている。この方法は、ほとんどの場合にうまく機能するが、テンプレートの章で見たように、テンプレートのインスタンス化はそれぞれ異なる型になる。一見同等のタスクオブジェクトが実際には異なる型になってしまう場合、この区別は望ましくない場合もある。

例えば、次の2つの関数は同じシグネチャを持っているが、task()関数テンプレートを呼び出すことで生成される2つのTaskインスタンスは、異なる型になる。その結果、これらは同じ配列のメンバにはなれない。

import std.parallelism;

double foo(int i) {
    return i * 1.5;
}

double bar(int i) {
    return i * 2.5;
}

void main() {
    auto tasks = [ task!foo(1),
                   task!bar(2) ];    // ← コンパイルエラー
}
D
parallelism.4
エラー: 型が一致しない((task(1)) : (task(2))):
'Task!(foo, int)*'と'Task!(bar, int)*'

task()の別のオーバーロードでは、代わりに最初の関数パラメータとして関数が指定される。

void someFunction(int value) {
    // ...
}

auto theTask = task(&someFunction, 42);
D

この方法では、Taskテンプレートの異なるインスタンス化が発生しないため、このようなオブジェクトを同じ配列に格納することが可能になる:

import std.parallelism;

double foo(int i) {
    return i * 1.5;
}

double bar(int i) {
    return i * 2.5;
}

void main() {
    auto tasks = [ task(&foo, 1),
                   task(&bar, 2) ];    // ← コンパイルする
}
D
parallelism.5

ラムダ関数や、opCallメンバーを定義する型のオブジェクトも、タスク関数として使うことができる。次の例は、ラムダを実行するタスクを開始する。

auto theTask = task((int value) {
                        /* ... */
                    }, 42);
D
例外

タスクは個別のスレッドで実行されるため、タスクがスローする例外は、そのタスクを起動したスレッドではキャッチできない。そのため、スローされた例外はタスク自体によって自動的にキャッチされ、yieldForce()などのTaskメンバー関数が呼び出されたときに再スローされる。これにより、メインスレッドはタスクによってスローされた例外をキャッチすることができる。

import std.stdio;
import std.parallelism;
import core.thread;

void mayThrow() {
    writeln("mayThrow() is started");
    Thread.sleep(1.seconds);
    writeln("mayThrow() is throwing an exception");
    throw new Exception("Error message");
}

void main() {
    auto theTask = task!mayThrow();
    theTask.executeInNewThread();

    writeln("main is continuing");
    Thread.sleep(3.seconds);

    writeln("main is waiting for the task");
    theTask.yieldForce();
}
D
parallelism.6

プログラムの出力は、タスクによってスローされたキャッチされていない例外によってプログラム全体がすぐに終了しないことを示している(タスクだけが終了する)。

mainは継続中
mayThrow()が開始
mayThrow()が例外をスローしている                 ← スロー
mainはタスクを待機中
object.Exception@deneme.d(10): エラーメッセージ        ← 終了

yieldForce() try-catchブロック内で呼び出して、タスクによってスローされた例外をキャッチすることができる。これは、単一スレッドの場合とは異なるので注意。この章までで作成してきたサンプルのような単一スレッドのプログラムでは、try-catchは例外をスローする可能性のあるコードをラップする。並列処理では、yieldForce()をラップする。

try {
    theTask.yieldForce();

} catch (Exception exc) {
    writefln("Detected an error in the task: '%s'", exc.msg);
}
D

この場合、プログラムは終了せずに、メインスレッドで例外がキャッチされる。

mainは継続中
mayThrow()が開始
mayThrow()が例外をスローしている                 ← スロー
mainはタスクを待機中
Detected an error in the task: 'エラーメッセージ'      ← キャッチ
Task

タスクの完了を待つ関数は3つある。

ほとんどの場合、タスクの完了を待つにはyieldForce()を呼び出すのが最も適している。この関数は、タスクが完了するまでyieldForce()を呼び出したスレッドをサスペンドする。spinForce()は、待機中にマイクロプロセッサをビジー状態にするが、タスクがごく短時間で完了すると予想される場合に適している。workForce()は、現在のスレッドをサスペンドするよりも、他のタスクを開始したい場合に呼び出すことができる。

Taskの他のメンバー関数については、Phobosのオンラインドキュメントを参照。

taskPool.asyncBuf()

parallel()と同様に、asyncBuf()InputRangeの範囲を並列に反復処理する。範囲から生成された要素をバッファに格納し、そのバッファから要素をユーザーに提供する。

潜在的に完全に遅延評価の入力範囲を完全に即時評価の範囲にしないため、要素を波状に反復処理する。並列に一定数の要素を準備すると、それらの要素がpopFront()によって消費されるまで待機し、次に次の波の要素を生成する。

asyncBuf()範囲と、各波で利用可能にする要素数を決定するオプションのバッファサイズを受け取る:

auto elements = taskPool.asyncBuf(range, buffer_size);
D

asyncBuf()の効果を確認するため、反復に半秒、各要素の処理に半秒かかる範囲を使用する。この範囲は、指定された上限までの整数を単純に生成する:

import std.stdio;
import core.thread;

struct Range {
    int limit;
    int i;

    bool empty() const {
        return i >= limit;
    }

    int front() const {
        return i;
    }

    void popFront() {
        writefln("Producing the element after %s", i);
        Thread.sleep(500.msecs);
        ++i;
    }
}

void main() {
    auto range = Range(10);

    foreach (element; range) {
        writefln("Using element %s", element);
        Thread.sleep(500.msecs);
    }
}
D
parallelism.7

要素は遅延生成され、使用される。各要素の処理に1秒かかるため、このプログラムでは全体の範囲を処理するのに10秒かかる:

time ./deneme
要素0を使用
0 の後に要素を生成
要素1を使用
1 の後に要素を生成
要素2を使用
...
8の後に要素を生成
要素9を使用
9の後に要素を生成

real    0m10.007s    ← 合計10秒
user    0m0.004s
sys     0m0.000s
Bash

その出力によると、要素は順序通りに生成され使用されている。

一方、前の要素が処理されるのを待たずに、次の要素の生成を開始する必要はないかもしれない。前の要素が使用中である間にも他の要素を生成できれば、プログラムの処理時間が短縮される:

import std.parallelism;
//...
    foreach (element; taskPool.asyncBuf(range, 2)) {
D

上記の呼び出しでは、asyncBuf()はバッファに2つの要素を準備する。要素は使用中に並列に生成される:

time ./deneme
0の後に要素を生成する
1の後に要素を生成する
要素0を使用する
2の後に要素を生成する
要素1を使用する
3の後に要素を生成する
要素2を使用する
4の後に要素を生成する
要素3を使用する
5の後に要素を生成する
要素4を使用する
要素6の後に要素を生成する
要素7の後に要素を生成する
要素5を使用する
要素6を使用する
要素8の後に要素を生成する
要素9の後に要素を生成する
要素7を使用する
要素8を使用する
要素9を使用する

real    0m6.007s    ← 今6秒
user    0m0.000s
sys     0m0.004s
Bash

バッファサイズのデフォルト値は 100 だ。最適なバッファサイズは、状況によって異なる。

asyncBuf()は、foreachループの外でも使うことができる。例えば、次のコードでは、asyncBuf()の戻り値を、セミイージーに動作するInputRangeとして使っている。

auto range = Range(10);
auto asyncRange = taskPool.asyncBuf(range, 2);
writeln(asyncRange.front);
D
taskPool.map()

taskPool.map()を説明する前に、std.algorithmモジュールからmap()について説明しておくとわかりやすい。std.algorithm.mapは、多くの関数型言語でよく見られるアルゴリズムである。これは、範囲内の要素を1つずつ関数に呼び出し、その関数を各要素に対して呼び出した結果で構成される範囲を返す。これは遅延アルゴリズムだ。つまり、必要に応じて関数を呼び出す。(std.algorithm.eachもある。これは、結果を生成するのではなく、各要素に対して副作用を生成するためのものだ。

std.algorithm.mapが遅延して動作するという事実は、多くのプログラムにおいて非常に強力だ。しかし、いずれにせよすべての要素に対して関数を呼び出す必要があり、各要素に対する操作が互いに独立している場合は、遅延は並列実行よりも不必要に速度を低下させる可能性がある。std.parallelismモジュールにあるtaskPool.map()およびtaskPool.amap()は、マルチコアの利点を活用し、多くの場合、より高速に実行される。

Studentの例を使って、これら3つのアルゴリズムを比較しよう。Studentには、学生の平均成績を返すメンバー関数があると仮定しよう。並列アルゴリズムがどのように高速であるかを実証するために、この関数をThread.sleep()を使って再び遅くしよう。

std.algorithm.mapは、関数をテンプレートパラメータとして、範囲を関数パラメータとして受け取る。この関数は、その関数を範囲の要素に適用した結果で構成される範囲を返す。

auto result_range = map!func(range);
D

この関数は、前の章で見たように、=>構文でラムダ式として指定することができる。次のプログラムは、map()を使用して、各要素に対してメンバー関数averageGrade()を呼び出す。

import std.stdio;
import std.algorithm;
import core.thread;

struct Student {
    int number;
    int[] grades;

    double averageGrade() {
        writefln("Started working on student %s",
                 number);
        Thread.sleep(1.seconds);

        const average = grades.sum / grades.length;

        writefln("Finished working on student %s", number);
        return average;
    }
}

void main() {
    Student[] students;

    foreach (i; 0 .. 10) {
        /* 各生徒に2つの成績 */
        students ~= Student(i, [80 + i, 90 + i]);
    }

    auto results = map!(a => a.averageGrade)(students);

    foreach (result; results) {
        writeln(result);
    }
}
D
parallelism.8

プログラムの出力は、map()が遅延評価で動作することを示している。foreachループが反復するたびに、各結果に対してaverageGrade()が呼び出される:

time ./deneme
生徒0の作業を開始した
生徒0の作業を完了した
85                   ← foreachの反復回数として計算
生徒1の作業を開始した
生徒1の作業を完了した
86
...
生徒9の作業を開始した
生徒9の作業を完了した
94

real    0m10.006s    ← 合計10秒
user    0m0.000s
sys     0m0.004s
Bash

std.algorithm.mapがイアグアルゴリズムだった場合、操作の開始と終了に関するメッセージはすべてプログラムの先頭に一括して出力される。

taskPool.map() std.parallelismモジュールは、基本的にstd.algorithm.mapと同じように動作する。唯一の違いは、関数呼び出しを半熱心に実行し、結果をバッファに格納して、必要に応じて提供することだ。このバッファのサイズは、2番目のパラメータで決定される。例えば、次のコードは、3つの要素の関数呼び出しの結果を一度に準備する。

import std.parallelism;
// ...
double averageGrade(Student student) {
    return student.averageGrade;
}
// ...
    auto results = taskPool.map!averageGrade(students, 3);
D

注釈:上記の独立したaverageGrade()関数は、TaskPool.mapのようなメンバー関数テンプレートでローカルデリゲートを使用する場合の制限により必要だ。この独立した関数がないと、コンパイルエラーになる。

auto results =
    taskPool.map!(a => a.averageGrade)(students, 3);  // ← コンパイルエラー
D

今回は、3つの要素ずつ波状に操作が実行される:

time ./deneme
生徒1の作業を開始   ← 並行して
生徒2の作業を開始  ← ただし順序は不確定
生徒0の作業を開始
生徒1の作業を完了
生徒2の作業を完了
生徒0の作業を完了
85
86
87
生徒4の作業を開始した
生徒5の作業を開始した
生徒3の作業を開始した
生徒4の作業を完了した
生徒3の作業を完了した
生徒5の作業を完了した
88
89
90
生徒7の作業を開始した
生徒8の作業を開始した
生徒6の作業を開始した
生徒7の作業を完了した
生徒6の作業を完了した
生徒8の作業を完了した
91
92
93
生徒9の作業を開始した
生徒9の作業を完了した
94

real    0m4.007s    ← 合計4秒
user    0m0.000s
sys     0m0.004s
Bash

map()の2番目のパラメータは、asyncBuf()と同じ意味だ。map()が結果を格納するために使用するバッファのサイズを決定する。3番目のパラメータは、parallel()と同じ作業単位のサイズだ。違いは、デフォルト値がsize_t.maxであることだ。

/* ... */ = taskPool.map!func(range,
                              buffer_size = 100
                              work_unit_size = size_t.max);
D
taskPool.amap()

Parallelamap()は、parallelmap()と2つの違いを除いて同じように動作する。

auto results = taskPool.amap!averageGrade(students);
D

イージングされているため、amap()が返される時点ですべての結果が準備完了になっている:

time ./deneme
生徒1の作業を開始した    ← すべてが前もって実行される
生徒0の作業を開始した
生徒2の作業を開始した
生徒3の作業を開始した
生徒1の作業を完了した
生徒4の作業を開始した
生徒2の作業を完了した
生徒3の作業を完了した
生徒6の作業を開始した
生徒0の作業を完了した
生徒7の作業を開始した
生徒5の作業を開始した
生徒4の作業を完了した
生徒8の作業を開始しました
生徒6の作業を完了しました
生徒9の作業を開始しました
生徒7の作業を完了しました
生徒5の作業を完了しました
生徒8の作業を完了しました
生徒9の作業を完了しました
85
86
87
88
89
90
91
92
93
94

real    0m3.005s    ← 合計3秒
user    0m0.000s
sys     0m0.004s
Bash

amap() map()よりも高速に動作するが、すべての結果を格納できる十分なサイズの配列を使用する代わりに、メモリ消費量が増加する。

amap()のオプションの2番目のパラメーターは、作業単位のサイズでもある:

auto results = taskPool.amap!averageGrade(students, 2);
D

結果は、amap()の3番目のパラメーターとして渡されるRandomAccessRangeに格納することもできる:

double[] results;
results.length = students.length;
taskPool.amap!averageGrade(students, 2, results);
D
taskPool.reduce()

map()同様、reduce()std.algorithmモジュールから説明した方がわかりやすい。

reduce()は、Rangesの章で見たstd.algorithm.foldと同等だ。2つの主な違いは、関数パラメータの順番が逆になっていることだ。(そのため、連鎖した範囲式でUFCSを活用できるfold()を、並列化されていないコードには使うことをお勧めする。)

reduce()は、多くの関数型言語でよく見られるもう1つの高レベルアルゴリズムだ。map()と同様に、1つ以上の関数をテンプレートパラメータとして取る。関数パラメータとしては、結果の初期値として使用する値と、範囲を取る。reduce()は、結果の現在の値と範囲の各要素を使用して関数を呼び出す。初期値が指定されていない場合は、代わりに範囲の最初の要素が使用される。

実装でresultという変数が定義されていると仮定すると、reduce()の動作は次の手順で説明できる。

  1. 初期値をresult
  2. result = func(result, element)式を実行する
  3. の最終値を返すresult

例えば、配列の要素の2乗の合計は、次のプログラムのように計算できる。

import std.stdio;
import std.algorithm;

void main() {
    writeln(reduce!((a, b) => a + b * b)(0, [5, 10]));
}
D
parallelism.9

上記のプログラムのように、関数が=>構文で指定されている場合、最初のパラメータ(ここではa)は結果の現在の値(上記のパラメータ0によって初期化)を表し、2番目のパラメータ(ここではb)は現在の要素を表す。

このプログラムは、25と100の和、5と10の平方を10ずつ出力する。

125

その動作から明らかなように、reduce()は実装にループを使用している。このループは通常、単一のコアで実行されるため、各要素の関数呼び出しが互いに独立している場合、不必要に遅くなる可能性がある。このような場合、std.parallelismモジュールにあるtaskPool.reduce()を使用すると、すべてのコアを活用することができる。

この例を見るために、再び人為的に速度を落とした関数を使ってreduce()を使ってみよう。

import std.stdio;
import std.algorithm;
import core.thread;

int aCalculation(int result, int element) {
    writefln("started  - element: %s, result: %s",
             element, result);

    Thread.sleep(1.seconds);
    result += element;

    writefln("finished - element: %s, result: %s",
             element, result);

    return result;
}

void main() {
    writeln("Result: ", reduce!aCalculation(0, [1, 2, 3, 4]));
}
D
parallelism.10

reduce()は、要素を順番に使用して結果の最終値に到達する。

time ./deneme
開始した  - 要素: 1, 結果: 0
終了した - 要素: 1, 結果: 1
開始した  - 要素: 2, 結果: 1
終了した - 要素: 2, 結果: 3
開始した  - 要素: 3, 結果: 3
終了した - 要素: 3, 結果: 6
開始した  - 要素: 4, 結果: 6
終了した - 要素: 4, 結果: 10
結果: 10

real    0m4.003s    ← 合計4秒
user    0m0.000s
sys     0m0.000s
Bash

parallel()およびmap()の例と同様に、std.parallelismモジュールをインポートし、taskPool.reduce()を呼び出すだけで、すべてのコアを活用することができる。

import std.parallelism;
// ...
    writeln("Result: ", taskPool.reduce!aCalculation(0, [1, 2, 3, 4]));
D

ただし、taskPool.reduce()の動作には重要な違いがある。

他の並列アルゴリズムと同様、taskPool.reduce()は、異なるタスクの要素を使用して関数を並行して実行する。各タスクは、割り当てられた要素を処理し、そのタスクの要素に対応するresultを計算する。reduce()は1つの初期値のみで呼び出されるため、すべてのタスクは、その初期値を使用して、自身のresult(上記のパラメータ0)を初期化する必要がある。

各タスクが生成する結果の最終値は、同じresult計算で最後に1回使用される。これらの最終計算は、並行ではなく順番に実行される。そのため、この章のような短い例では、taskPool.reduce()の実行速度が低下する場合がある。これは、次の出力で確認できる。

すべてのタスクに同じ初期値が使用され、事実上計算で複数回使用されるため、taskPool.reduce()std.algorithm.reduce()が計算した結果とは異なる結果を計算する可能性がある。そのため、初期値は、実行する計算の識別値である必要がある。例えば、この例では、追加の効果のない0である。

さらに、結果は連続した計算で同じ関数によって最後に1回使用されるため、関数が取るパラメータの型は、関数が返す値の型と互換性がある必要がある。

taskPool.reduce() これらの考慮事項の下でのみ使用すべきだ。

import std.parallelism;
// ...
    writeln("Result: ", taskPool.reduce!aCalculation(0, [1, 2, 3, 4]));
D

プログラムの出力は、まず並列で計算が実行され、その後その結果が順次計算されることを示している。順次実行される計算はハイライト表示されている:

time ./deneme
開始した  - 要素: 3, 結果: 0 ← まず、並行するタスク
開始した  - 要素: 2, 結果: 0
開始した  - 要素: 1, 結果: 0
開始した  - 要素: 4, 結果: 0
終了した - 要素: 3, 結果: 3
終了した - 要素: 1, 結果: 1
開始した  - 要素: 1, 結果: 0 ← その後、その結果は順番に
終了した - 要素: 4, 結果: 4
終了した - 要素: 2, 結果: 2
終了した - 要素: 1, 結果: 1
開始した  - 要素: 2, 結果: 1
終了した - 要素: 2, 結果: 3
開始した  - 要素: 3, 結果: 3
終了した - 要素: 3, 結果: 6
開始した  - 要素: 4, 結果: 6
終了した - 要素: 4, 結果: 10
結果: 10

real    0m5.006s    ← この例では、並列リデュースはより遅くなる
user    0m0.004s
sys     0m0.000s
Bash

並列reduce()は、数学定数π(π)の積分計算など、他の多くの計算でも高速だ。

複数の関数とタプル結果

std.algorithm.map()taskPool.map()taskPool.amap()、およびtaskPool.reduce()は、複数の関数を受け取ることができ、その場合は、結果はTupleとして返される。Tuple型は、前のタプルの章で説明した。個々の関数の結果は、関数が指定された順にタプルの要素に対応する。例えば、最初の関数の結果は、タプルの最初のメンバーになる。

次のプログラムは、std.algorithm.mapを使用した複数の関数を示している。以下のquarterOf()およびtenTimes()関数でわかるように、関数の戻り値の型は同じである必要はない。その場合、タプルの要素の型も異なる。

import std.stdio;
import std.algorithm;
import std.conv;

double quarterOf(double value) {
    return value / 4;
}

string tenTimes(double value) {
    return to!string(value * 10);
}

void main() {
    auto values = [10, 42, 100];
    auto results = map!(quarterOf, tenTimes)(values);

    writefln(" Quarters  Ten Times");

    foreach (quarterResult, tenTimesResult; results) {
        writefln("%8.2f%8s", quarterResult, tenTimesResult);
    }
}
D
parallelism.11

出力:

4分の110回分
2.50100
10.50420
25.001000

taskPool.reduce()の場合、結果の初期値はタプルとして指定する必要がある。

taskPool.reduce!(foo, bar)(tuple(0, 1), [1, 2, 3, 4]);
D
TaskPool

裏では、std.parallelismモジュール内の並列アルゴリズムはすべて、TaskPoolコンテナの要素であるタスクオブジェクトを使用している。通常、すべてのアルゴリズムは、taskPoolという名前の同じコンテナオブジェクトを使用する。

taskPoolこのコンテナは、プログラムが実行される環境に応じて適切な数のタスクを含む。そのため、通常は他のTaskPoolオブジェクトを作成する必要はない。それでも、必要に応じて明示的なTaskPoolオブジェクトを作成して使用することもできる。

TaskPoolコンストラクタは、後でそれを通じて開始される並列操作で使用するスレッドの数を引数として受け取る。スレッドの数のデフォルト値は、システム上のコア数の 1 減だ。この章で説明したすべての機能は、別のTaskPoolオブジェクトにも適用できる。

次の例は、ローカルTaskPoolオブジェクトでparallel()を呼び出している。

import std.stdio;
import std.parallelism;

void main() {
    auto workers = new TaskPool(2);

    foreach (i; workers.parallel([1, 2, 3, 4])) {
        writefln("Working on %s", i);
    }

    workers.finish();
}
D

TaskPool.finish() 現在のすべてのタスクが完了した際にオブジェクトの処理を停止するように指示する。

要約