メッセージパッシングによる並行処理

並行性は、前の章で取り上げた並列処理と似ているが、異なる概念だ。この2つの概念はどちらもスレッド上でプログラムを実行することに関わっており、並列処理は並行性に基づいてるため、しばしば混同される。

並列処理と並行処理の違いは以下の通りだ。

Dは、メッセージパッシングとデータ共有の両方の並行処理モデルをサポートしている。この章ではメッセージパッシングについて、次の章ではデータ共有について説明する。

概念

スレッド: オペレーティングシステムは、スレッドと呼ばれる作業単位としてプログラムを実行する。Dプログラムは、オペレーティングシステムによってそのプログラムに割り当てられたスレッドで、main()で実行を開始する。通常、プログラムのすべての操作はそのスレッドで実行される。プログラムは、複数のタスクを同時に実行できるように、他のスレッドを自由に開始することができる。実際、前の章で説明したタスクは、std.parallelismによって自動的に開始されるスレッドに基づいている。

オペレーティングシステムは、予測できないタイミングで、予測できない時間だけスレッドを一時停止することができる。その結果、変数を加算するといった単純な操作でさえ、操作の途中で一時停止される可能性がある。

++i;
D

上記の操作は、変数の値の読み取り、値の加算、新しい値の変数への再代入という3つのステップで構成されている。スレッドは、これらのステップの間の任意の時点で一時停止され、予測できない時間後に再開される場合がある。

メッセージ: スレッド間で渡されるデータは、メッセージと呼ばれる。メッセージは、任意の型および任意の数の変数で構成される。

スレッド識別子: すべてのスレッドにはIDが割り当てられており、メッセージの受信者を指定するために使用される。

オーナー: 他のスレッドを開始するスレッドは、新しいスレッドのオーナーと呼ばれる。

ワーカー: オーナーによって開始されたスレッドは、ワーカーと呼ばれる。

スレッドの開始

spawn()は、関数ポインタをパラメータとして取り、その関数から新しいスレッドを開始する。その関数によって実行される操作(その関数が呼び出す他の関数を含む)は、すべて新しいスレッドで実行される。spawn()で開始されるスレッドと、task()との主な違いは、spawn()ではスレッド同士がメッセージを送信できる点だ。

新しいスレッドが開始されると、オーナーとワーカーは独立したプログラムのように別々に実行を開始する:

import std.stdio;
import std.concurrency;
import core.thread;

void worker() {
    foreach (i; 0 .. 5) {
        Thread.sleep(500.msecs);
        writeln(i, " (worker)");
    }
}

void main() {
    spawn(&worker);

    foreach (i; 0 .. 5) {
        Thread.sleep(300.msecs);
        writeln(i, " (main)");
    }

    writeln("main is done.");
}
D
concurrency.1

この章の例では、スレッドが同時に実行されることを示すために、Thread.sleepを呼び出してスレッドの速度を遅くしている。プログラムの出力は、main()を実行するスレッドと、spawn()によって起動されたスレッドの2つのスレッドが、同時に独立して実行されていることを示している。

0main
0worker
1main
2main
1worker
3main
2worker
4main
mainは完了した。
3worker
4worker

プログラムは自動的にすべてのスレッドの実行が完了するまで待機する。上記の出力では、main()が"main is done"と表示して終了した後も、worker()が実行を継続していることから、これが確認できる。

スレッド関数が受け取るパラメータは、spawn()の2番目以降の引数として渡される。次のプログラムでは、2つのワーカスレッドがそれぞれ4つの数字を出力する。これらのスレッドは、スレッド関数のパラメータとして開始番号を受け取る。

import std.stdio;
import std.concurrency;
import core.thread;

void worker(int firstNumber) {
    foreach (i; 0 .. 4) {
        Thread.sleep(500.msecs);
        writeln(firstNumber + i);
    }
}

void main() {
    foreach (i; 1 .. 3) {
        spawn(&worker, i * 10);
    }
}
D
concurrency.2

スレッドの1つの出力をハイライト表示する:

10
20
11
21
12
22
13
23

出力の行は、オペレーティングシステムによってスレッドが一時停止および再開される方法によって、時間によって異なる場合がある。

すべてのオペレーティングシステムには、同時に存在できるスレッドの数に制限がある。この制限は、ユーザーごとに、システム全体に、あるいはその他の設定で設定できる。システム内のコアの数よりも、忙しく動作しているスレッドの数が多いと、システムの全体的なパフォーマンスが低下する。ある時点で忙しく動作しているスレッドは、その時点でCPUバインドされていると言う。一方、一部のスレッドは、ユーザーからの入力、ネットワーク接続からのデータ、Thread.sleep呼び出しの完了など、何らかのイベントが発生するのを待つために、その時間の多くを費やす。このようなスレッドは、その時点でI/Oに縛られていると言う。スレッドの大部分がI/Oに縛られている場合、プログラムはコア数よりも多くのスレッドを開始しても、パフォーマンスの低下なしに実行可能である。プログラムのパフォーマンスに関する設計決定では常にそうであるように、実際に測定を行って、それが本当に正しいかどうかを正確に確認する必要がある。

スレッド識別子

thisTid()は、現在のスレッドの識別子を返す。通常、関数括弧は省略して呼び出される。

import std.stdio;
import std.concurrency;

void printTid(string tag) {
    writefln("%s: %s", tag, thisTid);
}

void worker() {
    printTid("Worker");
}

void main() {
    spawn(&worker);
    printTid("Owner ");
}
D
concurrency.3

thisTid()の戻り値の型はTidで、プログラムには何の意味もない。toString()関数もオーバーロードされていない。

所有者Tid(std.concurrency.MessageBox)
ワーカーTid(std.concurrency.MessageBox)

これまで無視してきたspawn()の戻り値は、ワーカスレッドのIDである。

Tid myWorker = spawn(&worker);
D

逆に、ワーカスレッドの所有者は、ownerTid()関数によって取得される。

要約すると、所有者はownerTidによって識別され、ワーカーはspawn()の戻り値によって識別される。

メッセージの送信

send()はメッセージを送信し、receiveOnly()は特定の型のメッセージを待つ。(prioritySend()receive()receiveTimeout()もあるが、これについては後で説明する。)

次のプログラムでは、所有者はワーカーに型intのメッセージを送信し、ワーカーから型doubleのメッセージを待つ。所有者が負のintを送信するまで、スレッドはメッセージの送受信を続ける。以下は所有者スレッドだ。

void main() {
    Tid worker = spawn(&workerFunc);

    foreach (value; 1 .. 5) {
        worker.send(value);
        double result = receiveOnly!double();
        writefln("sent: %s, received: %s", value, result);
    }

    /* ワーカーに負の値を送信して、
     * ワーカーを終了させる。 */
    worker.send(-1);
}
D

main() spawn()の戻り値をworkerという名前で格納し、ワーカーにメッセージを送信する際にその変数を使用する。

一方、ワーカは必要なメッセージをintとして受け取り、その値を計算に使用し、結果をdouble型として所有者に送信する。

void workerFunc() {
    int value = 0;

    while (value >= 0) {
        value = receiveOnly!int();
        double result = to!double(value) / 5;
        ownerTid.send(result);
    }
}
D

メインスレッドは、送信したメッセージと受信したメッセージを報告する:

送信受信
10.2
20.4
30.6
40.8

同じメッセージの一部として複数の値を送信することも可能だ。次のメッセージは3つの部分で構成されている。

ownerTid.send(thisTid, 42, 1.5);
D

1つのメッセージの一部として渡される値は、受信側ではタプルとして表示される。このような場合、receiveOnly()のテンプレートパラメータは、タプルメンバーの型と一致する必要がある。

/* Tid、int、およびdoubleで構成されるメッセージを待つ。 */
auto message = receiveOnly!(Tid, int, double)();

auto sender   = message[0];    // Tid型
auto integer  = message[1];    // int型
auto floating = message[2];    // double型
D

型が一致しない場合、MessageMismatch例外がスローされる。

import std.concurrency;

void workerFunc() {
    ownerTid.send("hello");    // ← 文字列を送信
}

void main() {
    spawn(&workerFunc);

    auto message = receiveOnly!double();    // ← doubleを期待
}
D
concurrency.4

出力:

std.concurrency.MessageMismatch@std/concurrency.d(235):
予期しないメッセージ型: 'double'を期待したが、'immutable(char)[]'を受け取った
Undefined

ワーカーがスローする例外は、所有者はキャッチできない。1つの解決策は、ワーカーがメッセージとして送信される例外をキャッチするようにすることだ。これについては、以下で説明する。

これまで見た内容をシミュレーションプログラムで活用しよう。

以下のプログラムは、2 次元空間内でランダムに移動する独立したロボットをシミュレートする。各ロボットの移動は、開始時に3つの情報を取得する別々のスレッドで処理される:

この情報は、次のJob構造体に格納することができる。

struct Job {
    size_t robotId;
    Position origin;
    Duration restDuration;
}
D

各ロボットを動かすスレッド関数は、ロボットのIDとその動きを、所有者スレッドに連続的に送信する。

void robotMover(Job job) {
    Position from = job.origin;

    while (true) {
        Thread.sleep(job.restDuration);

        Position to = randomNeighbor(from);
        Movement movement = Movement(from, to);
        from = to;

        ownerTid.send(MovementMessage(job.robotId, movement));
    }
}
D

所有者は、これらのメッセージを無条件ループで待機する。メッセージの一部として送信されるロボットIDでロボットを識別する。所有者は、すべての移動を単純に表示する:

while (true) {
    auto message = receiveOnly!MovementMessage();

    writefln("%s %s",
             robots[message.robotId], message.movement);
}
D

このシンプルなプログラムでは、すべてのメッセージはワーカーからオーナーに送信される。メッセージの受け渡しは、多くのプログラムではより複雑な通信を伴う。

以下に完全なプログラムを示す:

import std.stdio;
import std.random;
import std.string;
import std.concurrency;
import core.thread;

struct Position {
    int line;
    int column;

    string toString() {
        return format("%s,%s", line, column);
    }
}

struct Movement {
    Position from;
    Position to;

    string toString() {
        return ((from == to)
                ? format("%s (idle)", from)
                : format("%s -> %s", from, to));
    }
}

class Robot {
    string image;
    Duration restDuration;

    this(string image, Duration restDuration) {
        this.image = image;
        this.restDuration = restDuration;
    }

    override string toString() {
        return format("%s(%s)", image, restDuration);
    }
}

/* 0,0付近のランダムな位置を返す。 */
Position randomPosition() {
    return Position(uniform!"[]"(-10, 10),
                    uniform!"[]"(-10, 10));
}

/* 指定された座標から最大1ステップ離れた位置を返す。 */
int randomStep(int current) {
    return current + uniform!"[]"(-1, 1);
}

/* 指定された位置の隣の位置を返す。
 * 8方向のいずれかの隣の位置、
 * または指定された位置自体になる場合もある。 */
Position randomNeighbor(Position position) {
    return Position(randomStep(position.line),
                    randomStep(position.column));
}

struct Job {
    size_t robotId;
    Position origin;
    Duration restDuration;
}

struct MovementMessage {
    size_t robotId;
    Movement movement;
}

void robotMover(Job job) {
    Position from = job.origin;

    while (true) {
        Thread.sleep(job.restDuration);

        Position to = randomNeighbor(from);
        Movement movement = Movement(from, to);
        from = to;

        ownerTid.send(MovementMessage(job.robotId, movement));
    }
}

void main() {
    /* さまざまなrestDurationを持つロボット。 */
    Robot[] robots = [ new Robot("A",  600.msecs),
                       new Robot("B", 2000.msecs),
                       new Robot("C", 5000.msecs) ];

    /* 各ロボットの移動スレッドを開始する。 */
    foreach (robotId, robot; robots) {
        spawn(&robotMover, Job(robotId,
                               randomPosition(),
                               robot.restDuration));
    }

    /* ロボットの移動に関する情報を
     * 収集する準備ができた。 */
    while (true) {
        auto message = receiveOnly!MovementMessage();

        /* このロボットの移動を表示する。 */
        writefln("%s %s",
                 robots[message.robotId], message.movement);
    }
}
D
concurrency.5

プログラムは終了するまですべての動きを出力する:

ロボット休息時間from.linefrom.columnto.lineto.column
A600ミリ秒6273
A600ミリ秒7383
A600ミリ秒8373
B2秒-7-4-6-3
A600ミリ秒7362
A600ミリ秒6271
A600ミリ秒71(休眠)(休眠)
B2秒-6-3(休眠)(休眠)
A600ミリ秒7172
A600ミリ秒7273
C5秒-4-4-3-5
A600ミリ秒7364
...

このプログラムは、メッセージパッシングによる並行処理の有用性を示している: ロボットの動きは、互いに情報を共有せずに独立したスレッドで計算される。オーナースレッドは、メッセージボックスからメッセージを1つずつ受け取ることで、表示プロセスを単純にシリアライズする

異なるタイプのメッセージが送信される場合

receiveOnly()は、1種類のメッセージしか期待できない。一方、receive()は、複数の種類のメッセージを待つことができる。メッセージは、メッセージ処理デリゲートにディスパッチされる。メッセージが到着すると、各デリゲートのメッセージ型と比較される。特定のメッセージの型と一致するデリゲートが、そのメッセージを処理する。

例えば、次のreceive()呼び出しは、それぞれintおよびstring型のメッセージを処理する2つのメッセージハンドラを指定している。

void workerFunc() {
    bool isDone = false;

    while (!isDone) {
        void intHandler(int message) {
            writeln("handling int message: ", message);

            if (message == -1) {
                writeln("exiting");
                isDone = true;
            }
        }

        void stringHandler(string message) {
            writeln("handling string message: ", message);
        }

        receive(&intHandler, &stringHandler);
    }
}
D
concurrency.6

int型のメッセージはintHandler()に、string型のメッセージはstringHandler()に一致する。上記のワーカスレッドは、次のプログラムでテストできる。

import std.stdio;
import std.concurrency;

// ...

void main() {
    auto worker = spawn(&workerFunc);

    worker.send(10);
    worker.send(42);
    worker.send("hello");
    worker.send(-1);        // ← workerを終了する
}
D

プログラムの出力は、メッセージが受信側で一致する関数によって処理されていることを示している。

intメッセージの処理10
intメッセージの処理42
文字列メッセージの処理hello
intメッセージの処理-1
終了

opCall()メンバー関数を定義するラムダ関数およびオブジェクトも、メッセージハンドラとしてreceive()に渡すことができる。次のワーカは、ラムダ関数によってメッセージを処理する。次のプログラムでは、スレッドに終了のタイミングを通知するために、Exitという特別な型も定義している。このような特定の型を使用すると、前の例のように任意の値-1を送信する場合よりも表現力が高まる。

以下には、メッセージハンドラとしてreceive()に渡される3つの匿名関数がある。これらの関数の波括弧が強調表示されている。

import std.stdio;
import std.concurrency;

struct Exit {
}

void workerFunc() {
    bool isDone = false;

    while (!isDone) {
        receive(
            (int message) {
                writeln("int message: ", message);
            },

            (string message) {
                writeln("string message: ", message);
            },

            (Exit message) {
                writeln("exiting");
                isDone = true;
            });
    }
}

void main() {
    auto worker = spawn(&workerFunc);

    worker.send(10);
    worker.send(42);
    worker.send("hello");
    worker.send(Exit());
}
D
concurrency.7
あらゆる型のメッセージを受信する

std.variant.Variantは、あらゆるタイプのデータをカプセル化できる型だ。引数リストで先に指定したハンドラと一致しないメッセージは、常にVariantハンドラと一致する。

import std.stdio;
import std.concurrency;

void workerFunc() {
    receive(
        (int message) { /* ... */ },

        (double message) { /* ... */ },

        (Variant message) {
            writeln("Unexpected message: ", message);
        });
}

struct SpecialMessage {
    // ...
}

void main() {
    auto worker = spawn(&workerFunc);
    worker.send(SpecialMessage());
}
D
concurrency.8

出力:

予期しないメッセージSpecialMessage()

Variantの詳細は、この章の範囲外である。

一定時間までのメッセージの待機

特定の時間を超えてメッセージを待機することは意味がない場合がある。送信者が一時的に忙しかったり、例外で終了した可能性がある。receiveTimeout()は、受信スレッドが無限にブロックされるのを防ぐ。

receiveTimeout()の最初のパラメータは、メッセージを待つ時間を決定する。その戻り値は、その時間内にメッセージが受信された場合はtrue、それ以外の場合falseとなる。

import std.stdio;
import std.concurrency;
import core.thread;

void workerFunc() {
    Thread.sleep(3.seconds);
    ownerTid.send("hello");
}

void main() {
    spawn(&workerFunc);

    writeln("Waiting for a message");
    bool received = false;
    while (!received) {
        received = receiveTimeout(600.msecs,
                                  (string message) {
                                      writeln("received: ", message);
                                });

        if (!received) {
            writeln("... no message yet");

            /* ... 他の操作がここで実行される場合がある ... */
        }
    }
}
D
concurrency.9

上記の所有者は、最大 600 ミリ秒間メッセージを待機する。その時間内にメッセージが到着しない場合、他の処理を継続することができる:

メッセージ待ち
... まだメッセージがない
... まだメッセージがない
... まだメッセージがない
... まだメッセージがない
受信: こんにちは
ワーカーの実行中の例外

前の章で見たように、std.parallelismモジュールは、タスクの実行中にスローされた例外を自動的にキャッチし、所有者のコンテキストで再スローする。これにより、所有者はそのような例外をキャッチすることができる。

try {
    theTask.yieldForce();

} catch (Exception exc) {
    writefln("Detected an error in the task: '%s'",
             exc.msg);
}
D

std.concurrencyは、一般的な例外型に対してこのような便利な機能を提供していない。ただし、例外はワーカーによって明示的にキャッチして送信することができる。後述するように、OwnerTerminatedおよびLinkTerminatedの例外をメッセージとして受信することも可能だ。

以下のcalculate()関数は、stringメッセージを受け取り、それらをdoubleに変換し、0.5を加算して、結果をメッセージとして返す。

void calculate() {
    while (true) {
        auto message = receiveOnly!string();
        ownerTid.send(to!double(message) + 0.5);
    }
}
D
concurrency.10

上記のto!double()の呼び出しは、文字列をdouble値に変換できない場合、例外をスローする。このような例外はワーカスレッドをすぐに終了させるため、次のプログラムでは、オーナーは最初のメッセージに対する応答しか受け取ることができない。

import std.stdio;
import std.concurrency;
import std.conv;

// ...

void main() {
    Tid calculator = spawn(&calculate);

    calculator.send("1.2");
    calculator.send("hello");  // ← 不正確な入力
    calculator.send("3.4");

    foreach (i; 0 .. 3) {
        auto message = receiveOnly!double();
        writefln("result %s: %s", i, message);
    }
}
D
concurrency.10

所有者は"1.2"に対する応答を1.7として受け取るが、ワーカーが終了しているため、所有者は到着しないメッセージを待ってブロックされる:

結果 0: 1.7
                 ← 決して到着しないメッセージ待ち

ワーカーができることの1つは、例外を明示的にキャッチして、特別なエラーメッセージとして送信することだ。次のプログラムは、失敗の理由をCalculationFailureメッセージとして送信する。さらに、このプログラムは、終了するタイミングをワーカーに通知するために、特別なメッセージ型を利用している。

import std.stdio;
import std.concurrency;
import std.conv;

struct CalculationFailure {
    string reason;
}

struct Exit {
}

void calculate() {
    bool isDone = false;

    while (!isDone) {
        receive(
            (string message) {
                try {
                    ownerTid.send(to!double(message) + 0.5);

                } catch (Exception exc) {
                    ownerTid.send(CalculationFailure(exc.msg));
                }
            },

            (Exit message) {
                isDone = true;
            });
    }
}

void main() {
    Tid calculator = spawn(&calculate);

    calculator.send("1.2");
    calculator.send("hello");  // ← 不正確な入力
    calculator.send("3.4");
    calculator.send(Exit());

    foreach (i; 0 .. 3) {
        writef("result %s: ", i);

        receive(
            (double message) {
                writeln(message);
            },

            (CalculationFailure message) {
                writefln("ERROR! '%s'", message.reason);
            });
    }
}
D
concurrency.11

今回は、失敗の理由が所有者によって表示される:

結果
01.7
1エラー! '数字が見つからない'
23.9

別の方法として、実際の例外オブジェクト自体を所有者に送信する方法がある。所有者は例外オブジェクトを使用するか、単に再投げることもできる:

// ... ワーカーで ...
                try {
                    // ...

                } catch (shared(Exception) exc) {
                    ownerTid.send(exc);
                }},

// ... オーナーで ...
        receive(
            // ...

            (shared(Exception) exc) {
                throw exc;
            });
D

shared指定子がなぜ必要なのかは、次の章で説明する。

スレッドの終了の検出

スレッドは、メッセージの受信者が終了したことを検出できる。

OwnerTerminated例外

この例外は、所有者が終了した場合に、所有者からメッセージを受信するとスローされる。以下の仲介所有者スレッドは、ワーカに2つのメッセージを送信した後、単に終了する。これにより、ワーカスレッドでOwnerTerminated例外がスローされる。

import std.stdio;
import std.concurrency;

void main() {
    spawn(&intermediaryFunc);
}

void intermediaryFunc() {
    auto worker = spawn(&workerFunc);
    worker.send(1);
    worker.send(2);
}  // ← 2つのメッセージを送信した後、終了する

void workerFunc() {
    while (true) {
        auto m = receiveOnly!int(); // ← 所有者が終了した場合、
                                    //   例外が
                                    //   スローされる。
        writeln("Message: ", m);
    }
}
D
concurrency.12

出力:

メッセージ: 1
メッセージ: 2
std.concurrency.OwnerTerminated@std/concurrency.d(248):
オーナーが終了した

ワーカーは例外をキャッチして正常に終了できる:

void workerFunc() {
    bool isDone = false;

    while (!isDone) {
        try {
            auto m = receiveOnly!int();
            writeln("Message: ", m);

        } catch (OwnerTerminated exc) {
            writeln("The owner has terminated.");
            isDone = true;
        }
    }
}
D

出力:

メッセージ: 1
メッセージ: 2
オーナーが終了した。

以下で、この例外がメッセージとして受信される場合もあることを確認する。

LinkTerminated例外

spawnLinked()は、spawn()と同じように使われる。spawnLinked()によって起動されたワーカーが終了すると、LinkTerminated例外が所有者にスローされる。

import std.stdio;
import std.concurrency;

void main() {
    auto worker = spawnLinked(&workerFunc);

    while (true) {
        auto m = receiveOnly!int(); // ← workerが
                                    //   終了した場合、例外が
                                    //   スローされる。
        writeln("Message: ", m);
    }
}

void workerFunc() {
    ownerTid.send(10);
    ownerTid.send(20);
}  // ← 2つのメッセージを送信した後、終了する
D
concurrency.13

上記のワーカーは2つのメッセージを送信した後、終了する。ワーカーはspawnLinked()によって開始されているため、所有者はLinkTerminated例外によってワーカーの終了を通知される:

メッセージ: 10
メッセージ: 20
std.concurrency.LinkTerminated@std/concurrency.d(263):
リンクが終了した

所有者は例外をキャッチして、graceful terminationのような特別な処理を行うことができる:

bool isDone = false;

while (!isDone) {
    try {
        auto m = receiveOnly!int();
        writeln("Message: ", m);

    } catch (LinkTerminated exc) {
        writeln("The worker has terminated.");
        isDone = true;
    }
}
D

出力:

メッセージ: 10
メッセージ: 20
ワーカーが終了した。

この例外はメッセージとして受信することもできる。

例外をメッセージとして受信する

OwnerTerminatedLinkTerminatedの例外は、メッセージとして受信することもできる。以下のコードは、OwnerTerminated例外の場合の例を示す:

bool isDone = false;

while (!isDone) {
    receive(
        (int message) {
            writeln("Message: ", message);
        },

        (OwnerTerminated exc) {
            writeln("The owner has terminated; exiting.");
            isDone = true;
        }
    );
}
D
メールボックスの管理

各スレッドには、そのスレッドに送信されたメッセージを格納するプライベートメールボックスがある。メールボックス内のメッセージの数は、スレッドが各メッセージを受信して応答するのにかかる時間に応じて増減する。メールボックスが継続的に拡大すると、システム全体に負荷がかかり、プログラムの設計上の欠陥が指摘される可能性がある。また、スレッドが最新のメッセージに到達できなくなる可能性もある。

setMaxMailboxSize()は、メールボックスが保持できるメッセージの数を制限するために使用される。その3つのパラメーターは、メールボックス、保持できる最大メッセージ数、メールボックスが満杯になった場合の動作を、その順に指定する。最後のパラメーターには4つの選択肢がある:

setMaxMailboxSize()の例を見る前に、まず、メールボックスが継続的に大きくなるようにしよう。次のプログラムでは、ワーカーがメッセージを連続して送信するが、所有者は各メッセージに一定の時間を費やす。

/* 警告: このプログラムの実行中は、
 *          システムが応答しなくなる可能性がある。 */
import std.concurrency;
import core.thread;

void workerFunc() {
    while (true) {
        ownerTid.send(42);    // ← メッセージを連続的に表示する
    }
}

void main() {
    spawn(&workerFunc);

    while (true) {
        receive(
            (int message) {
                // 各メッセージに時間を費やす
                Thread.sleep(1.seconds);
            });
    }
}
D
concurrency.14

コンシューマーがプロデューサーよりも遅いため、上記のプログラムで使用されるメモリは継続的に増加する。これを防ぐため、ワーカーを開始する前に、所有者はメールボックスのサイズを制限することができる:

void main() {
    setMaxMailboxSize(thisTid, 1000, OnCrowding.block);

    spawn(&workerFunc);
// ...
}
D

上記のsetMaxMailboxSize()呼び出しは、メインスレッドのメールボックスのサイズを1000に設定する。OnCrowding.blockは、メールボックスに空きができるまで送信者を待機させる。

次の例では、OnCrowding.throwExceptionを使用している。これにより、メールボックスが満杯の状態でメッセージを送信すると、MailboxFull例外がスローされる。

import std.concurrency;
import core.thread;

void workerFunc() {
    while (true) {
        try {
            ownerTid.send(42);

        } catch (MailboxFull exc) {
            /* 送信に失敗; 後で再試行する。 */
            Thread.sleep(1.msecs);
        }
    }
}

void main() {
    setMaxMailboxSize(thisTid, 1000, OnCrowding.throwException);

    spawn(&workerFunc);

    while (true) {
        receive(
            (int message) {
                Thread.sleep(1.seconds);
            });
    }
}
D
concurrency.15
優先メッセージ

prioritySend()を使用すると、通常のメッセージよりも高い優先度でメッセージを送信できる。これらのメッセージは、メールボックス内に既に存在する他のメッセージよりも先に処理される:

prioritySend(ownerTid, ImportantMessage(100));
D

受信者が優先メッセージの型に一致するメッセージハンドラを持っていない場合、PriorityMessageExceptionがスローされる。

std.concurrency.PriorityMessageException@std/concurrency.d(280):
優先メッセージ
Undefined
スレッド名

上記で用いたシンプルなプログラムでは、所有者やワーカーのスレッド ID を渡すことは簡単だった。しかし、複数のスレッドを使用するプログラムでは、スレッド間でスレッド ID を渡すことは複雑になる可能性がある。この複雑さを軽減するため、スレッドに名前を割り当て、どのスレッドからもグローバルにアクセスできるようにすることができる。

次の3つの関数は、すべてのスレッドがアクセスできる連想配列へのインターフェースを定義している。

次のプログラムは、名前で互いを検索する2つのスレッドを開始する。これらのスレッドは、Exitメッセージによって終了を指示されるまで、互いにメッセージを送信し続ける:

import std.stdio;
import std.concurrency;
import core.thread;

struct Exit {
}

void main() {
    // パートナー名が"second"のスレッド
    auto first = spawn(&player, "second");
    register("first", first);
    scope(exit) unregister("first");

    // パートナー名が"first"のスレッド
    auto second = spawn(&player, "first");
    register("second", second);
    scope(exit) unregister("second");

    Thread.sleep(2.seconds);

    prioritySend(first, Exit());
    prioritySend(second, Exit());

    // unregister()の呼び出しが成功するには、main()は
    // workerが終了するまで待機する必要がある。
    thread_joinAll();
}

void player(string nameOfPartner) {
    Tid partner;

    while (partner == Tid.init) {
        Thread.sleep(1.msecs);
        partner = locate(nameOfPartner);
    }

    bool isDone = false;

    while (!isDone) {
        partner.send("hello " ~ nameOfPartner);
        receive(
            (string message) {
                writeln("Message: ", message);
                Thread.sleep(500.msecs);
            },

            (Exit message) {
                writefln("%s, I am exiting.", nameOfPartner);
                isDone = true;
            });
    }
}
D
concurrency.16

main()の末尾にあるthread_joinAll()呼び出しは、所有者がすべてのワーカーが終了するのを待つためのものである。

出力:

メッセージ: こんにちは、2番目
メッセージ: こんにちは、1番目
メッセージ: こんにちは、2番目
メッセージ: こんにちは、1番目
メッセージ: こんにちは、1番目
メッセージ: こんにちは、2番目
メッセージ: こんにちは、1番目
メッセージ: こんにちは、2番目
2番目、終了する。
2番目、終了する。
要約