メッセージパッシングによる並行処理
並行性は、前の章で取り上げた並列処理と似ているが、異なる概念だ。この2つの概念はどちらもスレッド上でプログラムを実行することに関わっており、並列処理は並行性に基づいてるため、しばしば混同される。
- 並列処理の主な目的は、マイクロプロセッサのコアを活用してプログラムのパフォーマンスを向上させることだ。一方、並行処理は、シングルコア環境でも必要になる概念だ。並行処理とは、1つのプログラムを複数のスレッドで同時に実行することだ。並行処理の例としては、複数のクライアントからの要求に同時に応答するサーバープログラムがある。
- 並列処理では、タスクは互いに独立している。実際、同時に実行されている他のタスクの結果に依存している場合はバグとなる。並行処理では、スレッドが他のスレッドの結果に依存することは普通のことだ。
- どちらのプログラミングモデルもオペレーティングシステムのスレッドを使用するが、並列処理ではスレッドはタスクの概念によってカプセル化される。並行処理はスレッドを明示的に使用する。
- 並列処理は使いやすく、タスクが独立している限り、正しく動作するプログラムを簡単に作成できる。並行処理は、メッセージパッシングに基づく場合のみ簡単である。ロックベースのデータ共有を含む伝統的な並行処理モデルに基づく場合、正しい並行処理プログラムを書くことは非常に困難だ。
Dは、メッセージパッシングとデータ共有の両方の並行処理モデルをサポートしている。この章ではメッセージパッシングについて、次の章ではデータ共有について説明する。
概念
スレッド: オペレーティングシステムは、スレッドと呼ばれる作業単位としてプログラムを実行する。Dプログラムは、オペレーティングシステムによってそのプログラムに割り当てられたスレッドで、main()
で実行を開始する。通常、プログラムのすべての操作はそのスレッドで実行される。プログラムは、複数のタスクを同時に実行できるように、他のスレッドを自由に開始することができる。実際、前の章で説明したタスクは、std.parallelism
によって自動的に開始されるスレッドに基づいている。
オペレーティングシステムは、予測できないタイミングで、予測できない時間だけスレッドを一時停止することができる。その結果、変数を加算するといった単純な操作でさえ、操作の途中で一時停止される可能性がある。
上記の操作は、変数の値の読み取り、値の加算、新しい値の変数への再代入という3つのステップで構成されている。スレッドは、これらのステップの間の任意の時点で一時停止され、予測できない時間後に再開される場合がある。
メッセージ: スレッド間で渡されるデータは、メッセージと呼ばれる。メッセージは、任意の型および任意の数の変数で構成される。
スレッド識別子: すべてのスレッドにはIDが割り当てられており、メッセージの受信者を指定するために使用される。
オーナー: 他のスレッドを開始するスレッドは、新しいスレッドのオーナーと呼ばれる。
ワーカー: オーナーによって開始されたスレッドは、ワーカーと呼ばれる。
スレッドの開始
spawn()
は、関数ポインタをパラメータとして取り、その関数から新しいスレッドを開始する。その関数によって実行される操作(その関数が呼び出す他の関数を含む)は、すべて新しいスレッドで実行される。spawn()
で開始されるスレッドと、task()
との主な違いは、spawn()
ではスレッド同士がメッセージを送信できる点だ。
新しいスレッドが開始されると、オーナーとワーカーは独立したプログラムのように別々に実行を開始する:
この章の例では、スレッドが同時に実行されることを示すために、Thread.sleep
を呼び出してスレッドの速度を遅くしている。プログラムの出力は、main()
を実行するスレッドと、spawn()
によって起動されたスレッドの2つのスレッドが、同時に独立して実行されていることを示している。
0 | main |
0 | worker |
1 | main |
2 | main |
1 | worker |
3 | main |
2 | worker |
4 | main |
mainは完了した。 | |
3 | worker |
4 | worker |
プログラムは自動的にすべてのスレッドの実行が完了するまで待機する。上記の出力では、main()
が"main is done"と表示して終了した後も、worker()
が実行を継続していることから、これが確認できる。
スレッド関数が受け取るパラメータは、spawn()
の2番目以降の引数として渡される。次のプログラムでは、2つのワーカスレッドがそれぞれ4つの数字を出力する。これらのスレッドは、スレッド関数のパラメータとして開始番号を受け取る。
スレッドの1つの出力をハイライト表示する:
10
20
11
21
12
22
13
23
出力の行は、オペレーティングシステムによってスレッドが一時停止および再開される方法によって、時間によって異なる場合がある。
すべてのオペレーティングシステムには、同時に存在できるスレッドの数に制限がある。この制限は、ユーザーごとに、システム全体に、あるいはその他の設定で設定できる。システム内のコアの数よりも、忙しく動作しているスレッドの数が多いと、システムの全体的なパフォーマンスが低下する。ある時点で忙しく動作しているスレッドは、その時点でCPUバインドされていると言う。一方、一部のスレッドは、ユーザーからの入力、ネットワーク接続からのデータ、Thread.sleep
呼び出しの完了など、何らかのイベントが発生するのを待つために、その時間の多くを費やす。このようなスレッドは、その時点でI/Oに縛られていると言う。スレッドの大部分がI/Oに縛られている場合、プログラムはコア数よりも多くのスレッドを開始しても、パフォーマンスの低下なしに実行可能である。プログラムのパフォーマンスに関する設計決定では常にそうであるように、実際に測定を行って、それが本当に正しいかどうかを正確に確認する必要がある。
スレッド識別子
thisTid()
は、現在のスレッドの識別子を返す。通常、関数括弧は省略して呼び出される。
thisTid()
の戻り値の型はTid
で、プログラムには何の意味もない。toString()
関数もオーバーロードされていない。
所有者 | Tid(std.concurrency.MessageBox) |
---|---|
ワーカー | Tid(std.concurrency.MessageBox) |
これまで無視してきたspawn()
の戻り値は、ワーカスレッドのIDである。
逆に、ワーカスレッドの所有者は、ownerTid()
関数によって取得される。
要約すると、所有者はownerTid
によって識別され、ワーカーはspawn()
の戻り値によって識別される。
メッセージの送信
send()
はメッセージを送信し、receiveOnly()
は特定の型のメッセージを待つ。(prioritySend()
、receive()
、receiveTimeout()
もあるが、これについては後で説明する。)
次のプログラムでは、所有者はワーカーに型int
のメッセージを送信し、ワーカーから型double
のメッセージを待つ。所有者が負のint
を送信するまで、スレッドはメッセージの送受信を続ける。以下は所有者スレッドだ。
main()
spawn()
の戻り値をworker
という名前で格納し、ワーカーにメッセージを送信する際にその変数を使用する。
一方、ワーカは必要なメッセージをint
として受け取り、その値を計算に使用し、結果をdouble
型として所有者に送信する。
メインスレッドは、送信したメッセージと受信したメッセージを報告する:
送信 | 受信 |
---|---|
1 | 0.2 |
2 | 0.4 |
3 | 0.6 |
4 | 0.8 |
同じメッセージの一部として複数の値を送信することも可能だ。次のメッセージは3つの部分で構成されている。
1つのメッセージの一部として渡される値は、受信側ではタプルとして表示される。このような場合、receiveOnly()
のテンプレートパラメータは、タプルメンバーの型と一致する必要がある。
型が一致しない場合、MessageMismatch
例外がスローされる。
出力:
ワーカーがスローする例外は、所有者はキャッチできない。1つの解決策は、ワーカーがメッセージとして送信される例外をキャッチするようにすることだ。これについては、以下で説明する。
例
これまで見た内容をシミュレーションプログラムで活用しよう。
以下のプログラムは、2 次元空間内でランダムに移動する独立したロボットをシミュレートする。各ロボットの移動は、開始時に3つの情報を取得する別々のスレッドで処理される:
- ロボットの番号(ID):この情報は、メッセージに関連するロボットを特定するためにオーナーに送信される。
- 原点:ロボットが移動を開始する位置。
- 各ステップ間の間隔:この情報は、ロボットの次のステップを決定するために使用される。
この情報は、次のJob
構造体に格納することができる。
各ロボットを動かすスレッド関数は、ロボットのIDとその動きを、所有者スレッドに連続的に送信する。
所有者は、これらのメッセージを無条件ループで待機する。メッセージの一部として送信されるロボットIDでロボットを識別する。所有者は、すべての移動を単純に表示する:
このシンプルなプログラムでは、すべてのメッセージはワーカーからオーナーに送信される。メッセージの受け渡しは、多くのプログラムではより複雑な通信を伴う。
以下に完全なプログラムを示す:
プログラムは終了するまですべての動きを出力する:
ロボット | 休息時間 | from.line | from.column | to.line | to.column |
---|---|---|---|---|---|
A | 600ミリ秒 | 6 | 2 | 7 | 3 |
A | 600ミリ秒 | 7 | 3 | 8 | 3 |
A | 600ミリ秒 | 8 | 3 | 7 | 3 |
B | 2秒 | -7 | -4 | -6 | -3 |
A | 600ミリ秒 | 7 | 3 | 6 | 2 |
A | 600ミリ秒 | 6 | 2 | 7 | 1 |
A | 600ミリ秒 | 7 | 1 | (休眠) | (休眠) |
B | 2秒 | -6 | -3 | (休眠) | (休眠) |
A | 600ミリ秒 | 7 | 1 | 7 | 2 |
A | 600ミリ秒 | 7 | 2 | 7 | 3 |
C | 5秒 | -4 | -4 | -3 | -5 |
A | 600ミリ秒 | 7 | 3 | 6 | 4 |
... |
このプログラムは、メッセージパッシングによる並行処理の有用性を示している: ロボットの動きは、互いに情報を共有せずに独立したスレッドで計算される。オーナースレッドは、メッセージボックスからメッセージを1つずつ受け取ることで、表示プロセスを単純にシリアライズする。
異なるタイプのメッセージが送信される場合
receiveOnly()
は、1種類のメッセージしか期待できない。一方、receive()
は、複数の種類のメッセージを待つことができる。メッセージは、メッセージ処理デリゲートにディスパッチされる。メッセージが到着すると、各デリゲートのメッセージ型と比較される。特定のメッセージの型と一致するデリゲートが、そのメッセージを処理する。
例えば、次のreceive()
呼び出しは、それぞれint
およびstring
型のメッセージを処理する2つのメッセージハンドラを指定している。
int
型のメッセージはintHandler()
に、string
型のメッセージはstringHandler()
に一致する。上記のワーカスレッドは、次のプログラムでテストできる。
プログラムの出力は、メッセージが受信側で一致する関数によって処理されていることを示している。
intメッセージの処理 | 10 |
---|---|
intメッセージの処理 | 42 |
文字列メッセージの処理 | hello |
intメッセージの処理 | -1 |
終了
opCall()
メンバー関数を定義するラムダ関数およびオブジェクトも、メッセージハンドラとしてreceive()
に渡すことができる。次のワーカは、ラムダ関数によってメッセージを処理する。次のプログラムでは、スレッドに終了のタイミングを通知するために、Exit
という特別な型も定義している。このような特定の型を使用すると、前の例のように任意の値-1を送信する場合よりも表現力が高まる。
以下には、メッセージハンドラとしてreceive()
に渡される3つの匿名関数がある。これらの関数の波括弧が強調表示されている。
あらゆる型のメッセージを受信する
std.variant.Variant
は、あらゆるタイプのデータをカプセル化できる型だ。引数リストで先に指定したハンドラと一致しないメッセージは、常にVariant
ハンドラと一致する。
出力:
予期しないメッセージ | SpecialMessage() |
---|
Variant
の詳細は、この章の範囲外である。
一定時間までのメッセージの待機
特定の時間を超えてメッセージを待機することは意味がない場合がある。送信者が一時的に忙しかったり、例外で終了した可能性がある。receiveTimeout()
は、受信スレッドが無限にブロックされるのを防ぐ。
receiveTimeout()
の最初のパラメータは、メッセージを待つ時間を決定する。その戻り値は、その時間内にメッセージが受信された場合はtrue
、それ以外の場合false
となる。
上記の所有者は、最大 600 ミリ秒間メッセージを待機する。その時間内にメッセージが到着しない場合、他の処理を継続することができる:
メッセージ待ち
... まだメッセージがない
... まだメッセージがない
... まだメッセージがない
... まだメッセージがない
受信: こんにちは
ワーカーの実行中の例外
前の章で見たように、std.parallelism
モジュールは、タスクの実行中にスローされた例外を自動的にキャッチし、所有者のコンテキストで再スローする。これにより、所有者はそのような例外をキャッチすることができる。
std.concurrency
は、一般的な例外型に対してこのような便利な機能を提供していない。ただし、例外はワーカーによって明示的にキャッチして送信することができる。後述するように、OwnerTerminated
およびLinkTerminated
の例外をメッセージとして受信することも可能だ。
以下のcalculate()
関数は、string
メッセージを受け取り、それらをdouble
に変換し、0.5を加算して、結果をメッセージとして返す。
上記のto!double()
の呼び出しは、文字列をdouble
値に変換できない場合、例外をスローする。このような例外はワーカスレッドをすぐに終了させるため、次のプログラムでは、オーナーは最初のメッセージに対する応答しか受け取ることができない。
所有者は"1.2"に対する応答を1.7として受け取るが、ワーカーが終了しているため、所有者は到着しないメッセージを待ってブロックされる:
結果 0: 1.7
← 決して到着しないメッセージ待ち
ワーカーができることの1つは、例外を明示的にキャッチして、特別なエラーメッセージとして送信することだ。次のプログラムは、失敗の理由をCalculationFailure
メッセージとして送信する。さらに、このプログラムは、終了するタイミングをワーカーに通知するために、特別なメッセージ型を利用している。
今回は、失敗の理由が所有者によって表示される:
結果 | 値 |
---|---|
0 | 1.7 |
1 | エラー! '数字が見つからない' |
2 | 3.9 |
別の方法として、実際の例外オブジェクト自体を所有者に送信する方法がある。所有者は例外オブジェクトを使用するか、単に再投げることもできる:
shared
指定子がなぜ必要なのかは、次の章で説明する。
スレッドの終了の検出
スレッドは、メッセージの受信者が終了したことを検出できる。
OwnerTerminated
例外
この例外は、所有者が終了した場合に、所有者からメッセージを受信するとスローされる。以下の仲介所有者スレッドは、ワーカに2つのメッセージを送信した後、単に終了する。これにより、ワーカスレッドでOwnerTerminated
例外がスローされる。
出力:
メッセージ: 1
メッセージ: 2
std.concurrency.OwnerTerminated@std/concurrency.d(248):
オーナーが終了した
ワーカーは例外をキャッチして正常に終了できる:
出力:
メッセージ: 1
メッセージ: 2
オーナーが終了した。
以下で、この例外がメッセージとして受信される場合もあることを確認する。
LinkTerminated
例外
spawnLinked()
は、spawn()
と同じように使われる。spawnLinked()
によって起動されたワーカーが終了すると、LinkTerminated
例外が所有者にスローされる。
上記のワーカーは2つのメッセージを送信した後、終了する。ワーカーはspawnLinked()
によって開始されているため、所有者はLinkTerminated
例外によってワーカーの終了を通知される:
メッセージ: 10
メッセージ: 20
std.concurrency.LinkTerminated@std/concurrency.d(263):
リンクが終了した
所有者は例外をキャッチして、graceful terminationのような特別な処理を行うことができる:
出力:
メッセージ: 10
メッセージ: 20
ワーカーが終了した。
この例外はメッセージとして受信することもできる。
例外をメッセージとして受信する
OwnerTerminated
とLinkTerminated
の例外は、メッセージとして受信することもできる。以下のコードは、OwnerTerminated
例外の場合の例を示す:
メールボックスの管理
各スレッドには、そのスレッドに送信されたメッセージを格納するプライベートメールボックスがある。メールボックス内のメッセージの数は、スレッドが各メッセージを受信して応答するのにかかる時間に応じて増減する。メールボックスが継続的に拡大すると、システム全体に負荷がかかり、プログラムの設計上の欠陥が指摘される可能性がある。また、スレッドが最新のメッセージに到達できなくなる可能性もある。
setMaxMailboxSize()
は、メールボックスが保持できるメッセージの数を制限するために使用される。その3つのパラメーターは、メールボックス、保持できる最大メッセージ数、メールボックスが満杯になった場合の動作を、その順に指定する。最後のパラメーターには4つの選択肢がある:
-
OnCrowding.block
: 送信者はメールボックスに空きができるまで待機する。 OnCrowding.ignore
: メッセージは破棄される。-
OnCrowding.throwException
: メッセージの送信時に、MailboxFull
例外がスローされる。 bool function(Tid)
型の関数ポインタ:指定された関数が呼び出される。
setMaxMailboxSize()
の例を見る前に、まず、メールボックスが継続的に大きくなるようにしよう。次のプログラムでは、ワーカーがメッセージを連続して送信するが、所有者は各メッセージに一定の時間を費やす。
コンシューマーがプロデューサーよりも遅いため、上記のプログラムで使用されるメモリは継続的に増加する。これを防ぐため、ワーカーを開始する前に、所有者はメールボックスのサイズを制限することができる:
上記のsetMaxMailboxSize()
呼び出しは、メインスレッドのメールボックスのサイズを1000に設定する。OnCrowding.block
は、メールボックスに空きができるまで送信者を待機させる。
次の例では、OnCrowding.throwException
を使用している。これにより、メールボックスが満杯の状態でメッセージを送信すると、MailboxFull
例外がスローされる。
優先メッセージ
prioritySend()
を使用すると、通常のメッセージよりも高い優先度でメッセージを送信できる。これらのメッセージは、メールボックス内に既に存在する他のメッセージよりも先に処理される:
受信者が優先メッセージの型に一致するメッセージハンドラを持っていない場合、PriorityMessageException
がスローされる。
スレッド名
上記で用いたシンプルなプログラムでは、所有者やワーカーのスレッド ID を渡すことは簡単だった。しかし、複数のスレッドを使用するプログラムでは、スレッド間でスレッド ID を渡すことは複雑になる可能性がある。この複雑さを軽減するため、スレッドに名前を割り当て、どのスレッドからもグローバルにアクセスできるようにすることができる。
次の3つの関数は、すべてのスレッドがアクセスできる連想配列へのインターフェースを定義している。
-
register()
: スレッドに名前を関連付ける。 -
locate()
: 指定された名前と関連付けられたスレッドを返す。その名前と関連付けられたスレッドが存在しない場合、Tid.init
が返される。 -
unregister()
: 指定された名前とスレッドの関連付けを解除する。
次のプログラムは、名前で互いを検索する2つのスレッドを開始する。これらのスレッドは、Exit
メッセージによって終了を指示されるまで、互いにメッセージを送信し続ける:
main()
の末尾にあるthread_joinAll()
呼び出しは、所有者がすべてのワーカーが終了するのを待つためのものである。
出力:
メッセージ: こんにちは、2番目
メッセージ: こんにちは、1番目
メッセージ: こんにちは、2番目
メッセージ: こんにちは、1番目
メッセージ: こんにちは、1番目
メッセージ: こんにちは、2番目
メッセージ: こんにちは、1番目
メッセージ: こんにちは、2番目
2番目、終了する。
2番目、終了する。
要約
- スレッドが他のスレッドに依存しない場合、並列処理を優先。これは前章で扱ったテーマである。スレッドが他のスレッドの操作に依存する場合のみ、並行処理を検討。
- データ共有による並行処理は正しく実装するのが難しいため、この章で扱うメッセージパッシングによる並行処理を優先しよう。
spawn()
spawnLinked()
はスレッドを開始する。thisTid
は現在のスレッドのIDだ。ownerTid
は、現在のスレッドの所有者のスレッド ID だ。send()
prioritySend()
はメッセージを送信する。receiveOnly()
、receive()
、およびreceiveTimeout()
はメッセージを待機する。Variant
は、あらゆるタイプのメッセージに一致する。setMaxMailboxSize()
メールボックスのサイズを制限する。register()
、unregister()
、およびlocate()
は、スレッド名を参照することを許可する。- メッセージの受け渡し中に、
MessageMismatch
、OwnerTerminated
、LinkTerminated
、MailboxFull
、PriorityMessageException
の例外がスローされることがある。 - 所有者は、ワーカーからスローされる例外を自動的にキャッチすることはできない。