並列処理
最近のほとんどのマイクロプロセッサは、複数のコアで構成されており、各コアは個別の処理ユニットとして動作する。これらのコアは、異なるプログラムの異なる部分を同時に実行することができる。std.parallelism
モジュールの機能により、プログラムはすべてのコアを活用してより高速に実行することができる。
この章では、以下の範囲アルゴリズムについて説明する。これらのアルゴリズムは、並列に実行される操作が互いに完全に独立している場合にのみ使用すべきだ。並列とは、複数のコアで同時に操作が実行されることを意味する。
parallel
: 範囲内の要素を並列にアクセスする。task
: 並列に実行されるタスクを作成する。asyncBuf
:InputRange
の要素を半熱心に並列に反復処理する。map
:InputRange
の要素を半熱心に並行して関数呼び出しを行う。amap
:RandomAccessRange
の要素を使用して、関数を完全に熱心に並行して呼び出す。reduce
:RandomAccessRange
の要素に対して並列に計算を行う。
これまで作成したプログラムでは、プログラムの式は、少なくとも一般的には1行ずつ、特定の順序で実行されると仮定してきた。
上記のコードでは、j
の値が加算される前に、i
の値が加算されることを期待している。これは意味的には正しいが、実際にはほとんどない。マイクロプロセッサやコンパイラは、最適化技術を使用して、互いに依存しない変数をマイクロプロセッサのレジスタに格納する。その場合、マイクロプロセッサは、上記の加算のような操作を並行して実行する。
これらの最適化は効果的だが、非常に低レベルの操作よりも上位のレイヤーには自動的に適用できない。特定のハイレベル操作が独立しており、並列に実行できるかどうかは、プログラマーのみが判断できる。
ループでは、範囲内の要素は通常、一つずつ順番に処理され、各要素の操作は前の要素の操作に続いて実行される:
通常、プログラムは、オペレーティングシステムによってプログラムの実行に割り当てられたマイクロプロセッサのコアの1つで実行される。foreach
ループは通常、要素を順番に処理するため、aSlowOperation()
は各学生に対して順番に呼び出される。しかし、多くの場合、後続の学生の操作を開始する前に、先行する学生の操作が完了している必要はない。Student
オブジェクトの操作が本当に独立している場合は、システム上でアイドル状態で待機している可能性のある他のマイクロプロセッサコアを無視することは無駄だ。
長時間の操作をシミュレートするために、次の例では、core.thread
モジュールからThread.sleep()
を呼び出している。Thread.sleep()
は、指定した時間だけ操作を中断する。Thread.sleep
は、コアを一切使用せずに時間を消費するため、次の例で使用する手法としては明らかに不自然だ。現実的ではない手法だが、この章では、並列処理の威力を示すために有用だ。
プログラムの実行時間は、ターミナルでtime
と入力して測定できる。
学生は順に反復処理され、各学生の作業に1秒かかるため、総実行時間は4秒になる。しかし、これらの操作が4つのコアを持つ環境で実行された場合、同時に処理され、総時間は約1秒に短縮される。
その方法を見る前に、まず、std.parallelism.totalCPUs
で、システムで使用可能なコアの数を決定しよう。
この章で記述されている環境でのプログラムの出力は次の通りだ。
このシステムには4つのコアがある。
taskPool.parallel()
この関数は、parallel()
と単純に呼び出すこともできる。
parallel()
は、範囲内の要素に並行してアクセスする。この関数は、foreach
ループで効果的に使用できる。std.parallelism
モジュールをインポートし、上記のプログラムでstudents
をparallel(students)
に置き換えるだけで、システムのすべてのコアを活用することができる。
構造体およびクラス用のforeach
で、 foreach
ブロック内の式はopApply()
メンバー関数にデリゲートとして渡されることを、すでに説明した。parallel()
は、delegate
の実行を各要素の個別のコアに分散する方法を知っている範囲オブジェクトを返す。
その結果、Student
範囲をparallel()
経由で渡すと、4 コアのシステムでは上記のプログラムは 1 秒で終了する。
注釈:プログラムの実行時間は、システムによって異なる場合があるが、おおよそ"4 秒÷コア数"になると予想される。
プログラムの特定の部分を実行する流れを、実行スレッドまたは スレッドと呼ぶ。プログラムは、同時にアクティブに実行されている複数のスレッドで構成される場合がある。オペレーティングシステムは、各スレッドをコアで起動して実行し、他のスレッドを実行するためにそのスレッドを一時停止する。各スレッドの実行には、起動と一時停止が何度も繰り返される場合がある。
ある時点でアクティブなすべてのプログラムのスレッドは、マイクロプロセッサのコア上で実行される。オペレーティングシステムは、各スレッドをいつ、どのような条件で開始および一時停止するかを決定する。そのため、aSlowOperation()
によって出力されるメッセージは、上記の出力では順番が混在している。Student
オブジェクトの操作が互いに完全に独立している場合は、スレッドの実行順序が確定しないことは問題にならないかもしれない。
各要素に適用される操作が各反復で独立している場合にのみ、parallel()
を呼び出すことは、プログラマーの責任だ。例えば、出力にメッセージが特定の順序で表示されることが重要な場合、上記のプログラムではparallel()
を呼び出すことはエラーとみなすべきだ。他のスレッドに依存するスレッドをサポートするプログラミングモデルは、並行処理と呼ばれる。並行処理は、次の章で扱うトピックだ。
並列foreach
が終了するまでに、ループ内のすべての操作がすべての要素に対して完了している。foreach
ループの後は、プログラムを安全に続行できる。
作業単位のサイズ
parallel()
の2番目のパラメーターはオーバーロードされており、一部のケースでは無視される:
RandomAccessRange
で範囲を反復処理する場合:コアへのスレッドの分散には、わずかなコストがかかる。このコストは、ループの操作がごく短時間で完了する場合などに、大きな影響を与えることがある。このような場合、各スレッドがループを1回以上実行した方が高速になることがある。作業単位のサイズは、各スレッドが各反復で実行する要素の数を決定する。
ワークユニットサイズのデフォルト値は100で、ほとんどの場合に適している。
- 非
RandomAccessRange
範囲を反復処理する場合:parallel()
は、非RandomAccessRange
の要素がワークユニットサイズの数だけ連続して実行されるまで、並列実行を開始しない。100という値は比較的大きいため、parallel()
は、短い非RandomAccessRange
範囲で試した場合、効果がないという誤った印象を与える可能性がある。 asyncBuf()
または並列map()
(どちらもこの章の後半で説明される)の結果範囲を反復処理する場合:parallel()
がasyncBuf()
またはmap()
の結果に対して動作する場合、作業単位サイズパラメーターは無視される。代わりに、parallel()
は結果範囲の内部バッファを再利用する。
Task
プログラムの他の操作と並行して実行される操作は、タスクと呼ばれる。タスクは、std.parallelism.Task
型で表される。
実際、parallel()
は各ワーカースレッドごとに新しいTask
オブジェクトを構築し、そのタスクを自動的に開始する。parallel()
は、すべてのタスクが完了するまで待機し、最後にループから退出する。parallel()
は、タスクの 構築、開始、待機を自動的に行うため、非常に便利だ。
タスクが範囲の要素に対応しない、または範囲の要素で表現できない場合、この3つのステップはプログラマが明示的に処理することができる。task()
はタスクオブジェクトを構築し、executeInNewThread()
はタスクオブジェクトを開始し、yieldForce()
はタスクオブジェクトを待機する。この3つの関数の詳細については、次のプログラムのコメントで説明する。
次のプログラムでは、anOperation()
関数が2回起動される。id
の最初の文字を出力して、どのタスクを処理しているかを示す。
注釈:通常、stdout
のような出力ストリームに出力された文字は、すぐには出力には表示されない。その代わりに、1行の出力が入力されるまで出力バッファに格納される。write
は改行文字を出力しないため、次のプログラムの並列実行を確認するには、行の終わりに達する前にstdout.flush()
を呼び出して、バッファの内容をstdout
に送信する必要がある。
プログラムの出力は次のようになるはずだ。m
とt
の文字が混在して表示されていることは、操作が並列に実行されていることを示している:
mainの呼び出しには3秒かかる
theTaskには5秒かかる
mtmttmmttmmttttt
すべて完了; 結果は2だ。
上記のタスク関数は、task()
のテンプレートパラメータとしてtask!anOperation
と指定されている。この方法は、ほとんどの場合にうまく機能するが、テンプレートの章で見たように、テンプレートのインスタンス化はそれぞれ異なる型になる。一見同等のタスクオブジェクトが実際には異なる型になってしまう場合、この区別は望ましくない場合もある。
例えば、次の2つの関数は同じシグネチャを持っているが、task()
関数テンプレートを呼び出すことで生成される2つのTask
インスタンスは、異なる型になる。その結果、これらは同じ配列のメンバにはなれない。
エラー: 型が一致しない((task(1)) : (task(2))):
'Task!(foo, int)*'と'Task!(bar, int)*'
task()
の別のオーバーロードでは、代わりに最初の関数パラメータとして関数が指定される。
この方法では、Task
テンプレートの異なるインスタンス化が発生しないため、このようなオブジェクトを同じ配列に格納することが可能になる:
ラムダ関数や、opCall
メンバーを定義する型のオブジェクトも、タスク関数として使うことができる。次の例は、ラムダを実行するタスクを開始する。
例外
タスクは個別のスレッドで実行されるため、タスクがスローする例外は、そのタスクを起動したスレッドではキャッチできない。そのため、スローされた例外はタスク自体によって自動的にキャッチされ、yieldForce()
などのTask
メンバー関数が呼び出されたときに再スローされる。これにより、メインスレッドはタスクによってスローされた例外をキャッチすることができる。
プログラムの出力は、タスクによってスローされたキャッチされていない例外によってプログラム全体がすぐに終了しないことを示している(タスクだけが終了する)。
mainは継続中
mayThrow()が開始
mayThrow()が例外をスローしている ← スロー
mainはタスクを待機中
object.Exception@deneme.d(10): エラーメッセージ ← 終了
yieldForce()
try-catch
ブロック内で呼び出して、タスクによってスローされた例外をキャッチすることができる。これは、単一スレッドの場合とは異なるので注意。この章までで作成してきたサンプルのような単一スレッドのプログラムでは、try-catch
は例外をスローする可能性のあるコードをラップする。並列処理では、yieldForce()
をラップする。
この場合、プログラムは終了せずに、メインスレッドで例外がキャッチされる。
mainは継続中
mayThrow()が開始
mayThrow()が例外をスローしている ← スロー
mainはタスクを待機中
Detected an error in the task: 'エラーメッセージ' ← キャッチ
Task
done
: タスクが完了したかどうかを指定する。タスクが例外で終了した場合、例外を再スローする。executeInNewThread()
: タスクを新しいスレッドで開始する。executeInNewThread(int priority)
: 指定した優先度で新しいスレッドでタスクを開始する。(優先度は、スレッドの実行優先度を決定するオペレーティングシステムの概念だ。)
タスクの完了を待つ関数は3つある。
yieldForce()
: タスクがまだ開始されていない場合はタスクを開始する。タスクがすでに完了している場合は、その戻り値を返す。タスクがまだ実行中の場合は、マイクロプロセッサをビジー状態にすることなく、タスクの完了を待つ。例外がスローされている場合は、その例外を再スローする。-
spinForce()
: と同様の動作をするが、完了をできるだけ早く検出するために、待機中にマイクロプロセッサをビジー状態にする。yieldForce()
-
workForce()
: と同様の動作をするが、タスクの完了を待つ間、現在のスレッドで新しいタスクを開始する。yieldForce()
ほとんどの場合、タスクの完了を待つにはyieldForce()
を呼び出すのが最も適している。この関数は、タスクが完了するまでyieldForce()
を呼び出したスレッドをサスペンドする。spinForce()
は、待機中にマイクロプロセッサをビジー状態にするが、タスクがごく短時間で完了すると予想される場合に適している。workForce()
は、現在のスレッドをサスペンドするよりも、他のタスクを開始したい場合に呼び出すことができる。
Task
の他のメンバー関数については、Phobosのオンラインドキュメントを参照。
taskPool.asyncBuf()
parallel()
と同様に、asyncBuf()
はInputRange
の範囲を並列に反復処理する。範囲から生成された要素をバッファに格納し、そのバッファから要素をユーザーに提供する。
潜在的に完全に遅延評価の入力範囲を完全に即時評価の範囲にしないため、要素を波状に反復処理する。並列に一定数の要素を準備すると、それらの要素がpopFront()
によって消費されるまで待機し、次に次の波の要素を生成する。
asyncBuf()
範囲と、各波で利用可能にする要素数を決定するオプションのバッファサイズを受け取る:
asyncBuf()
の効果を確認するため、反復に半秒、各要素の処理に半秒かかる範囲を使用する。この範囲は、指定された上限までの整数を単純に生成する:
要素は遅延生成され、使用される。各要素の処理に1秒かかるため、このプログラムでは全体の範囲を処理するのに10秒かかる:
その出力によると、要素は順序通りに生成され使用されている。
一方、前の要素が処理されるのを待たずに、次の要素の生成を開始する必要はないかもしれない。前の要素が使用中である間にも他の要素を生成できれば、プログラムの処理時間が短縮される:
上記の呼び出しでは、asyncBuf()
はバッファに2つの要素を準備する。要素は使用中に並列に生成される:
バッファサイズのデフォルト値は 100 だ。最適なバッファサイズは、状況によって異なる。
asyncBuf()
は、foreach
ループの外でも使うことができる。例えば、次のコードでは、asyncBuf()
の戻り値を、セミイージーに動作するInputRange
として使っている。
taskPool.map()
taskPool.map()
を説明する前に、std.algorithm
モジュールからmap()
について説明しておくとわかりやすい。std.algorithm.map
は、多くの関数型言語でよく見られるアルゴリズムである。これは、範囲内の要素を1つずつ関数に呼び出し、その関数を各要素に対して呼び出した結果で構成される範囲を返す。これは遅延アルゴリズムだ。つまり、必要に応じて関数を呼び出す。(std.algorithm.each
もある。これは、結果を生成するのではなく、各要素に対して副作用を生成するためのものだ。
std.algorithm.map
が遅延して動作するという事実は、多くのプログラムにおいて非常に強力だ。しかし、いずれにせよすべての要素に対して関数を呼び出す必要があり、各要素に対する操作が互いに独立している場合は、遅延は並列実行よりも不必要に速度を低下させる可能性がある。std.parallelism
モジュールにあるtaskPool.map()
およびtaskPool.amap()
は、マルチコアの利点を活用し、多くの場合、より高速に実行される。
Student
の例を使って、これら3つのアルゴリズムを比較しよう。Student
には、学生の平均成績を返すメンバー関数があると仮定しよう。並列アルゴリズムがどのように高速であるかを実証するために、この関数をThread.sleep()
を使って再び遅くしよう。
std.algorithm.map
は、関数をテンプレートパラメータとして、範囲を関数パラメータとして受け取る。この関数は、その関数を範囲の要素に適用した結果で構成される範囲を返す。
この関数は、前の章で見たように、=>
構文でラムダ式として指定することができる。次のプログラムは、map()
を使用して、各要素に対してメンバー関数averageGrade()
を呼び出す。
プログラムの出力は、map()
が遅延評価で動作することを示している。foreach
ループが反復するたびに、各結果に対してaverageGrade()
が呼び出される:
std.algorithm.map
がイアグアルゴリズムだった場合、操作の開始と終了に関するメッセージはすべてプログラムの先頭に一括して出力される。
taskPool.map()
std.parallelism
モジュールは、基本的にstd.algorithm.map
と同じように動作する。唯一の違いは、関数呼び出しを半熱心に実行し、結果をバッファに格納して、必要に応じて提供することだ。このバッファのサイズは、2番目のパラメータで決定される。例えば、次のコードは、3つの要素の関数呼び出しの結果を一度に準備する。
注釈:上記の独立したaverageGrade()
関数は、TaskPool.map
のようなメンバー関数テンプレートでローカルデリゲートを使用する場合の制限により必要だ。この独立した関数がないと、コンパイルエラーになる。
今回は、3つの要素ずつ波状に操作が実行される:
map()
の2番目のパラメータは、asyncBuf()
と同じ意味だ。map()
が結果を格納するために使用するバッファのサイズを決定する。3番目のパラメータは、parallel()
と同じ作業単位のサイズだ。違いは、デフォルト値がsize_t.max
であることだ。
taskPool.amap()
Parallelamap()
は、parallelmap()
と2つの違いを除いて同じように動作する。
- 完全にイージーだ。
RandomAccessRange
の範囲で動作する。
イージングされているため、amap()
が返される時点ですべての結果が準備完了になっている:
amap()
map()
よりも高速に動作するが、すべての結果を格納できる十分なサイズの配列を使用する代わりに、メモリ消費量が増加する。
amap()
のオプションの2番目のパラメーターは、作業単位のサイズでもある:
結果は、amap()
の3番目のパラメーターとして渡されるRandomAccessRange
に格納することもできる:
taskPool.reduce()
map()
同様、reduce()
はstd.algorithm
モジュールから説明した方がわかりやすい。
reduce()
は、Rangesの章で見たstd.algorithm.fold
と同等だ。2つの主な違いは、関数パラメータの順番が逆になっていることだ。(そのため、連鎖した範囲式でUFCSを活用できるfold()
を、並列化されていないコードには使うことをお勧めする。)
reduce()
は、多くの関数型言語でよく見られるもう1つの高レベルアルゴリズムだ。map()
と同様に、1つ以上の関数をテンプレートパラメータとして取る。関数パラメータとしては、結果の初期値として使用する値と、範囲を取る。reduce()
は、結果の現在の値と範囲の各要素を使用して関数を呼び出す。初期値が指定されていない場合は、代わりに範囲の最初の要素が使用される。
実装でresult
という変数が定義されていると仮定すると、reduce()
の動作は次の手順で説明できる。
- 初期値を
result
result = func(result, element)
式を実行する- の最終値を返す
result
例えば、配列の要素の2乗の合計は、次のプログラムのように計算できる。
上記のプログラムのように、関数が=>
構文で指定されている場合、最初のパラメータ(ここではa
)は結果の現在の値(上記のパラメータ0
によって初期化)を表し、2番目のパラメータ(ここではb
)は現在の要素を表す。
このプログラムは、25と100の和、5と10の平方を10ずつ出力する。
125
その動作から明らかなように、reduce()
は実装にループを使用している。このループは通常、単一のコアで実行されるため、各要素の関数呼び出しが互いに独立している場合、不必要に遅くなる可能性がある。このような場合、std.parallelism
モジュールにあるtaskPool.reduce()
を使用すると、すべてのコアを活用することができる。
この例を見るために、再び人為的に速度を落とした関数を使ってreduce()
を使ってみよう。
reduce()
は、要素を順番に使用して結果の最終値に到達する。
parallel()
およびmap()
の例と同様に、std.parallelism
モジュールをインポートし、taskPool.reduce()
を呼び出すだけで、すべてのコアを活用することができる。
ただし、taskPool.reduce()
の動作には重要な違いがある。
他の並列アルゴリズムと同様、taskPool.reduce()
は、異なるタスクの要素を使用して関数を並行して実行する。各タスクは、割り当てられた要素を処理し、そのタスクの要素に対応するresult
を計算する。reduce()
は1つの初期値のみで呼び出されるため、すべてのタスクは、その初期値を使用して、自身のresult
(上記のパラメータ0
)を初期化する必要がある。
各タスクが生成する結果の最終値は、同じresult
計算で最後に1回使用される。これらの最終計算は、並行ではなく順番に実行される。そのため、この章のような短い例では、taskPool.reduce()
の実行速度が低下する場合がある。これは、次の出力で確認できる。
すべてのタスクに同じ初期値が使用され、事実上計算で複数回使用されるため、taskPool.reduce()
はstd.algorithm.reduce()
が計算した結果とは異なる結果を計算する可能性がある。そのため、初期値は、実行する計算の識別値である必要がある。例えば、この例では、追加の効果のない0
である。
さらに、結果は連続した計算で同じ関数によって最後に1回使用されるため、関数が取るパラメータの型は、関数が返す値の型と互換性がある必要がある。
taskPool.reduce()
これらの考慮事項の下でのみ使用すべきだ。
プログラムの出力は、まず並列で計算が実行され、その後その結果が順次計算されることを示している。順次実行される計算はハイライト表示されている:
並列reduce()
は、数学定数π(π)の積分計算など、他の多くの計算でも高速だ。
複数の関数とタプル結果
std.algorithm.map()
、taskPool.map()
、taskPool.amap()
、およびtaskPool.reduce()
は、複数の関数を受け取ることができ、その場合は、結果はTuple
として返される。Tuple
型は、前のタプルの章で説明した。個々の関数の結果は、関数が指定された順にタプルの要素に対応する。例えば、最初の関数の結果は、タプルの最初のメンバーになる。
次のプログラムは、std.algorithm.map
を使用した複数の関数を示している。以下のquarterOf()
およびtenTimes()
関数でわかるように、関数の戻り値の型は同じである必要はない。その場合、タプルの要素の型も異なる。
出力:
4分の1 | 10回分 |
---|---|
2.50 | 100 |
10.50 | 420 |
25.00 | 1000 |
taskPool.reduce()
の場合、結果の初期値はタプルとして指定する必要がある。
TaskPool
裏では、std.parallelism
モジュール内の並列アルゴリズムはすべて、TaskPool
コンテナの要素であるタスクオブジェクトを使用している。通常、すべてのアルゴリズムは、taskPool
という名前の同じコンテナオブジェクトを使用する。
taskPool
このコンテナは、プログラムが実行される環境に応じて適切な数のタスクを含む。そのため、通常は他のTaskPool
オブジェクトを作成する必要はない。それでも、必要に応じて明示的なTaskPool
オブジェクトを作成して使用することもできる。
TaskPool
コンストラクタは、後でそれを通じて開始される並列操作で使用するスレッドの数を引数として受け取る。スレッドの数のデフォルト値は、システム上のコア数の 1 減だ。この章で説明したすべての機能は、別のTaskPool
オブジェクトにも適用できる。
次の例は、ローカルTaskPool
オブジェクトでparallel()
を呼び出している。
TaskPool.finish()
現在のすべてのタスクが完了した際にオブジェクトの処理を停止するように指示する。
要約
- 操作が互いに独立していない場合、操作を並列に実行することはエラーである。
parallel()
範囲内の要素を並列にアクセスする。- タスクは、
task()
、executeInNewThread()
、yieldForce()
を使用して、それぞれ明示的に作成、開始、待機することができる。 - タスクからエスケープされた例外は、
yieldForce()
などのほとんどの並列処理関数で後でキャッチすることができる。 asyncBuf()
InputRange
の要素を半熱心に並列に反復する。map()
InputRange
の要素を半熱心に並行して関数に呼び出す。amap()
RandomAccessRange
の要素を使用して、関数を完全にイージングして並行して呼び出す。reduce()
RandomAccessRange
の要素に対して並列に計算を行う。map()
、amap()
、およびreduce()
は、複数の関数を受け取り、その結果をタプルとして返すことができる。- 必要に応じて、
taskPool
以外のTaskPool
オブジェクトを使用することもできる。