データ共有の並列処理

前の章では、メッセージの受け渡しによって情報を共有するスレッドについて説明した。その章で述べたように、メッセージの受け渡しは、安全な同時実行の方法だ。

別の方法としては、複数のスレッドが同じデータを読み書きする方法がある。例えば、所有者スレッドがbool変数のアドレスでワーカを起動し、ワーカはその変数の現在の値を読み取って、終了するかどうかを決定することができる。別の例としては、所有者が同じ変数のアドレスで複数のワーカを起動し、その変数が複数のワーカによって変更される場合がある。

データ共有が安全でない理由のひとつは、レース条件である。レース条件は、複数のスレッドが、制御されない順序で同じ変更可能なデータにアクセスした場合に発生する。オペレーティングシステムは、個々のスレッドを不特定の方法で一時停止および起動するため、レース条件のあるプログラムの動作は予測できない。

この章の例は単純に見えるかもしれない。しかし、ここで取り上げている問題は、より大規模な実際のプログラムでも発生する。また、これらの例ではstd.concurrencyモジュールを使用しているが、この章で説明する概念はcore.threadモジュールにも適用される。

共有は自動的ではない

他のほとんどのプログラミング言語とは異なり、Dではデータは自動的に共有されない。データはデフォルトでスレッドローカルである。モジュールレベルの変数は、すべてのスレッドからアクセスできる印象があるが、実際には各スレッドは独自のコピーを取得する。

import std.stdio;
import std.concurrency;
import core.thread;

int variable;

void printInfo(string message) {
    writefln("%s: %s (@%s)", message, variable, &variable);
}

void worker() {
    variable = 42;
    printInfo("Before the worker is terminated");
}

void main() {
    spawn(&worker);
    thread_joinAll();
    printInfo("After the worker is terminated");
}
D
concurrency_shared.1

worker()内で変更されたvariableは、main()によって認識されるvariableとは別のものになる。この事実は、変数の値とアドレスの両方を出力することで確認できる。

アクションアドレス
ワーカーが終了する前427F26C6711670
ワーカーが終了した後07F26C68127D0

各スレッドは独自のデータコピーを取得するため、spawn()ではスレッドローカル変数への参照を渡すことはできない。例えば、bool変数のアドレスを別のスレッドに渡そうとする次のプログラムは、コンパイルできない。

import std.concurrency;

void worker(bool * isDone) {
    while (!(*isDone)) {
        // ...
    }
}

void main() {
    bool isDone = false;
    spawn(&worker, &isDone);      // ← コンパイルエラー

    // ...

    // workerに終了を通知したい:
    isDone = true;

    // ...
}
D
concurrency_shared.2

std.concurrencyモジュール内のstatic assertは、別のスレッドから変更可能なデータへのアクセスを禁止する。

src/phobos/std/concurrency.d(329): エラー: static assert
"変更可能なスレッドローカルデータへのエイリアスは許可されていない。"
Undefined

モジュール内のisDoneは、スレッド間で変更可能なデータへのアクセスを禁止する。

この規則の例外は、__gsharedとして定義された変数だ:

__gshared int globallyShared;
D

このような変数はプログラム全体で1つのコピーしか存在せず、すべてのスレッドがその変数を共有できる。__gsharedは、CやC++のような言語のライブラリと相互作用する場合に必要である。これらの言語では、データ共有がデフォルトで自動化されているためだ。

sharedスレッド間で変更可能なデータを共有するには

共有する必要がある変更可能な変数は、sharedキーワードで定義する必要がある。

import std.concurrency;

void worker(shared(bool) * isDone) {
    while (*isDone) {
        // ...
    }
}

void main() {
    shared(bool) isDone = false;
    spawn(&worker, &isDone);

    // ...

    // workerに終了を通知する:
    isDone = true;

    // ...
}
D
concurrency_shared.3

注釈:スレッドにシグナルを送るには、メッセージパッシングを使用することをお勧めする。

一方、immutable変数は変更できないため、直接共有しても問題はない。そのため、immutablesharedを意味する:

import std.stdio;
import std.concurrency;
import core.thread;

void worker(immutable(int) * data) {
    writeln("data: ", *data);
}

void main() {
    immutable(int) i = 42;
    spawn(&worker, &i);         // ← コンパイルする

    thread_joinAll();
}
D
concurrency_shared.4

出力:

データ42

iの寿命はmain()のスコープによって定義されるため、main()がワーカスレッドよりも先に終了しないことが重要だ。上記のcore.thread.thread_joinAllの呼び出しは、スレッドがすべての子スレッドが終了するのを待つためだ。

レース条件の例

変更可能なデータがスレッド間で共有されている場合、プログラムの正確性には特別な注意が必要である。

競合状態の例を見るために、同じ変更可能変数を共有する複数のスレッドを考えてみよう。次のプログラムでは、スレッドは2つの変数としてアドレスを受け取り、その値を何度も交換している。

import std.stdio;
import std.concurrency;
import core.thread;

void swapper(shared(int) * first, shared(int) * second) {
    foreach (i; 0 .. 10_000) {
        int temp = *second;
        *second = *first;
        *first = temp;
    }
}

void main() {
    shared(int) i = 1;
    shared(int) j = 2;

    writefln("before: %s and %s", i, j);

    foreach (id; 0 .. 10) {
        spawn(&swapper, &i, &j);
    }

    // すべてのスレッドがタスクを完了するまで待つ
    thread_joinAll();

    writefln("after : %s and %s", i, j);
}
D
concurrency_shared.5

上記のプログラムはコンパイルは成功するが、ほとんどのケースで正しく動作しない。10個のスレッドが起動し、すべて同じ2つの変数ijにアクセスする。レース条件により、他のスレッドの操作を意図せず妨害してしまう。

また、交換の総回数は 10,000回×10回であることも注目。この回数は偶数であるため、変数の値は、最終的には初期値である1と2になることが予想される。

前: 1と2
後 : 1と2    ← 期待される結果

この結果が生じる可能性はあるが、実際の結果はほとんどの場合、以下のいずれかになる:

前: 1と2
後 : 1と1    ← 誤った結果
前: 1と2
後 : 2と2    ← 誤った結果

"2と1"という結果になる可能性は極めて低いものの、あり得ないわけではない。

プログラムが正しく動作しない理由は、レース状態にある2つのスレッド間の以下のシナリオで説明できる。オペレーティングシステムは、不確定なタイミングでスレッドを一時停止および再開するため、2つのスレッドの操作は、以下の順序で実行される可能性もある。

iが1、jが2の状態を考えてみよう。2つのスレッドは同じswapper()関数を実行するが、ローカル変数tempはスレッドごとに別であり、他のスレッドのtemp変数とは無関係であることを覚えておいて。これらの別々の変数を識別するために、以下ではtempAおよびtempBと名前を変更している。

以下のチャートは、forループ内の3行のコードが、各スレッドによって時間経過とともに実行される方法を、上から下へ、操作1が最初の操作、操作6が最後の操作として示している。各ステップでiまたはjが変更されたかどうかは、その変数をハイライト表示して示している:

操作        スレッドA                             スレッドB
────────────────────────────────────────────────────────────────────────────

  1:   int temp = *second; (tempA==2)
  2:   *second = *first;   (i==1, j==1)

          (この時点でAが一時停止され、Bが開始されていると仮定)

  3:                                        int temp = *second; (tempB==1)
  4:                                        *second = *first;   (i==1, j==1)

          (この時点でBが一時停止し、Aが再開されていると仮定)

  5:   *first = temp;    (i==2, j==1)

          (この時点でAが一時停止し、Bが再開されていると仮定)

  6:                                        *first = temp;    (i==1, j==1)

ご覧のとおり、前のシナリオの最後では、ijはどちらも値1になる。この時点以降、これらの変数が他の値になることはあり得ない。

上記のシナリオは、プログラムの誤った結果を説明するのに十分な1つの例にすぎない。もちろん、この例の10個のスレッドの場合、レース条件ははるかに複雑になるだろう。

synchronizedレース条件を回避する

上記のプログラムの誤動作は、複数のスレッドが同じ変更可能なデータにアクセスし(そのうちの少なくとも1つがデータを変更した)、発生している。このようなレースコンディションを回避する1つの方法は、共通のコードにキーワードsynchronizedを指定することだ。次の変更を加えると、プログラムは正しく動作する。

foreach (i; 0 .. 10_000) {
    synchronized {
        int temp = *b;
        *b = *a;
        *a = temp;
    }
}
D

出力:

前: 1と2
後 : 1と2      ← 正しい結果

synchronizedの効果は、バックグラウンドでロックを作成し、特定の時点において1つのスレッドのみがそのロックを保持できるようにすることだ。ロックを保持しているスレッドのみ実行可能となり、他のスレッドは、実行中のスレッドがsynchronizedブロックを完了してロックが再び利用可能になるまで待機する。同期化されたコードは一度に1つのスレッドによって実行されるため、各スレッドは、別のスレッドが同じ操作を行う前に、値を安全に交換する。同期化されたブロックの処理が終了すると、変数iおよびjの状態は、常に"1 および 2"または"2 および 1"になる。

注釈:スレッドがロックを待つことは比較的コストの高い操作であり、プログラムの実行を著しく遅らせる可能性がある。幸い、synchronizedブロックを使用せずに、後で説明するアトミック操作を利用することで、プログラムの正確性を確保できる場合がある。

複数のコードブロックを同期する必要がある場合は、synchronizedキーワードを使用して1つ以上のロックを指定することができる。

同じ共有変数にアクセスする2つの別々のコードブロックを含む次のプログラムで、この例を見てみよう。このプログラムは、同じ変数のアドレスを持つ2つの関数を呼び出し、一方の関数はその変数を指定された回数だけ加算し、もう一方の関数は指定された回数だけ減算する。

void incrementer(shared(int) * value) {
    foreach (i; 0 .. count) {
        *value = *value + 1;
    }
}

void decrementer(shared(int) * value) {
    foreach (i; 0 .. count) {
        *value = *value - 1;
    }
}
D

注釈:上記の式の短縮形 (++(*value)および‑‑(*value)) を使用すると、shared変数に対するこのような読み取り-変更-書き込み操作は廃止予定であるとの警告がコンパイラから表示される。

残念ながら、これらのブロックを個別にsynchronizedでマークするだけでは不十分である。なぜなら、2つのブロックの匿名ロックは独立しているため、2つのコードブロックは依然として同じ変数を同時にアクセスするからだ:

import std.stdio;
import std.concurrency;
import core.thread;

enum count = 1000;

void incrementer(shared(int) * value) {
    foreach (i; 0 .. count) {
        synchronized { // ← このロックは、下のロックとは異なる。
            *value = *value + 1;
        }
    }
}

void decrementer(shared(int) * value) {
    foreach (i; 0 .. count) {
        synchronized { // ← このロックは、上のロックとは異なる。
            *value = *value - 1;
        }
    }
}

void main() {
    shared(int) number = 0;

    foreach (i; 0 .. 100) {
        spawn(&incrementer, &number);
        spawn(&decrementer, &number);
    }

    thread_joinAll();
    writeln("Final value: ", number);
}
D
concurrency_shared.6

同じ変数を同じ回数加算および減算するスレッドの数が同じであるため、numberの最終的な値は0になることが予想される。しかし、ほとんどの場合、そうはならない。

最終値: -672    ← 0ではない

複数のブロックが同じロックまたはロックを使用するには、synchronizedの括弧内にロックオブジェクトを指定する必要がある:

注釈:この機能はdmd 2.098.1ではサポートされていない。

// 注釈: dmd 2.098.1はthis機能をサポートしていない。
synchronized (lock_object, another_lock_object, ...)
D

Dでは、任意のクラスオブジェクトをsynchronizedロックとして使用できるため、特別なロック型は必要ない。次のプログラムは、そのオブジェクトをロックとして使用する、Lockという空のクラスを定義している。

import std.stdio;
import std.concurrency;
import core.thread;

enum count = 1000;

class Lock {
}

void incrementer(shared(int) * value, shared(Lock) lock) {
    foreach (i; 0 .. count) {
        synchronized (lock) {
            *value = *value + 1;
        }
    }
}

void decrementer(shared(int) * value, shared(Lock) lock) {
    foreach (i; 0 .. count) {
        synchronized (lock) {
            *value = *value - 1;
        }
    }
}

void main() {
    shared(Lock) lock = new shared(Lock)();
    shared(int) number = 0;

    foreach (i; 0 .. 100) {
        spawn(&incrementer, &number, lock);
        spawn(&decrementer, &number, lock);
    }

    thread_joinAll();
    writeln("Final value: ", number);
}
D
concurrency_shared.7

今回は両方のsynchronizedブロックが同じロックで接続されているため、いずれか一方のみが実行され、結果は期待通り0になる:

最終値: 0       ← 正しい結果

クラス型もsynchronizedとして定義することができる。これは、その型の非静的メンバ関数がすべて、そのクラスの特定のオブジェクトで同期化されることを意味する。

synchronized class Class {
    void foo() {
        // ...
    }

    void bar() {
        // ...
    }
}
D

以下のコードは、上記のクラス定義と等価である:

class Class {
    void foo() {
        synchronized (this) {
            // ...
        }
    }

    void bar() {
        synchronized (this) {
            // ...
        }
    }
}
D

複数のオブジェクトでコードブロックを同期化する必要がある場合は、それらのオブジェクトを一緒に指定する必要がある。そうしないと、複数のスレッドが、他のスレッドが待機しているオブジェクトをロックしてしまう可能性があり、その場合はプログラムがデッドロック状態になる可能性がある。

この問題のよく知られた例は、ある銀行口座から別の銀行口座へ送金を行う関数だ。この関数がマルチスレッド環境で正しく動作するには、まず両方の口座をロックする必要がある。しかし、次の試みは間違っている。

void transferMoney(shared BankAccount from,
                   shared BankAccount to) {
    synchronized (from) {           // ← 正しくない
        synchronized (to) {
            // ...
        }
    }
}
D

このエラーは、1つのスレッドが口座Aから口座Bへ送金しようとしている間に、別のスレッドが逆方向の送金を試みた場合で説明できる。各スレッドは、それぞれが対応するfromオブジェクトをロックし、次にtoオブジェクトをロックするつもりだった可能性がある。fromオブジェクトは、2つのスレッドでそれぞれAとBに対応しているため、オブジェクトは別々のスレッドでロック状態になり、もう一方のスレッドがtoオブジェクトをロックできなくなる。この状況はデッドロックと呼ばれる。

この問題の解決策は、オブジェクト間の順序関係を定義し、その順序でロックすることだ。これは、synchronized文によって自動的に処理される。Dでは、このようなデッドロックを回避するには、同じsynchronized文でオブジェクトを指定するだけで十分である。

注釈:この機能はdmd 2.098.1ではサポートされていない。

void transferMoney(shared BankAccount from,
                   shared BankAccount to) {
    // 注: dmd 2.098.1では、this機能はサポートされていない。
    synchronized (from, to) {       // ← 正しい
        // ...
    }
}
D
単一の初期化にはshared static this()、単一の最終化にはshared static ~this()

モジュール(その変数を含む)の初期化にstatic this()を使用できることは既に説明した。データはデフォルトでスレッドローカルであるため、モジュールレベルの変数をすべてのスレッドで初期化するには、static this()をすべてのスレッドで実行する必要がある:

import std.stdio;
import std.concurrency;
import core.thread;

static this() {
    writeln("executing static this()");
}

void worker() {
}

void main() {
    spawn(&worker);

    thread_joinAll();
}
D
concurrency_shared.8

上記のstatic this()ブロックは、メインスレッドとワーカースレッドでそれぞれ1回ずつ実行される:

static this()を実行している
static this()を実行している

これは、sharedモジュール変数に問題を引き起こす。変数を複数回初期化することは、特に並行処理においてレース条件により不正な動作を引き起こすためだ。(これは、immutable変数にも適用される。なぜなら、それらは暗黙的にsharedであるためだ。)解決策は、プログラムごとに一度だけ実行されるshared static this()ブロックを使用することだ:

int a;              // スレッドローカル
immutable int b;    // すべてのスレッドで共有される

static this() {
    writeln("Initializing per-thread variable at ", &a);
    a = 42;
}

shared static this() {
    writeln("Initializing per-program variable at ", &b);
    b = 43;
}
D

出力:

6B0120でプログラムごとの変数を初期化    ← 1回のみ
7FBDB36557D0でスレッドごとの変数を初期化
7FBDB3554670でスレッドごとの変数を初期化

同様に、shared static ~this()は、プログラムごとに一度だけ実行する必要がある最終操作に使用する。

アトミック操作

特定の変数を1つのスレッドだけが変更するようにするためのもう1つの方法は、アトミック操作を使うことだ。この機能は、マイクロプロセッサ、コンパイラ、またはオペレーティングシステムによって提供されている。

Dのアトミック操作は、core.atomicモジュールにある。この章では、その関数のうち2つだけを見ていく。

atomicOp

この関数は、そのテンプレートパラメータを2つの関数パラメータに適用する。テンプレートパラメータは、 "+", "+=",

import core.atomic;

// ...

        atomicOp!"+="(*value, 1);    // atomic
D

上記の行は、以下の行と等価だが、+=操作が他のスレッドによって中断されずに実行される点(つまり、アトミックに実行される点)が異なる:

*value += 1;                 // アトミックではない
D

したがって、同期化が必要なのがバイナリ演算のみの場合は、ロックを取得する必要があるため遅くなることが知られているsynchronizedブロックは必要ない。atomicOpを使用するincrementer()およびdecrementer()関数の以下の同等も正しい。Lockクラスも不要になったことに注意しよう。

import core.atomic;

//...

void incrementer(shared(int) * value) {
    foreach (i; 0 .. count) {
        atomicOp!"+="(*value, 1);
    }
}

void decrementer(shared(int) * value) {
    foreach (i; 0 .. count) {
        atomicOp!"-="(*value, 1);
    }
}
D

atomicOpは、他の二項演算子とも使用可能である。

cas

この関数の名前は、"compare and swap"の略語だ。その動作は、変数が現在の値を持っている場合にその値を変更すると表現できる。この関数は、変数の現在の値と希望する値を同時に指定して使用する。

bool is_mutated = cas(address_of_variable, currentValue, newValue);
D

cas()が動作しているときに変数の値がまだcurrentValueであるということは、このスレッドが最後に変数を読み込んだ後、他のスレッドによって変数が変更されていないことを示している。その場合、cas()は、変数にnewValueを代入し、trueを返す。一方、変数の値がcurrentValueと異なる場合、cas()は変数を変更せず、falseを返す。

次の関数は、現在の値を再読み込みし、操作が成功するまでcas()を呼び出す。ここでも、これらの呼び出しは、変数の値が古い値と等しい場合、新しい値に置き換える、と表現することができる。

void incrementer(shared(int) * value) {
    foreach (i; 0 .. count) {
        int currentValue;

        do {
            currentValue = *value;
        } while (!cas(value, currentValue, currentValue + 1));
    }
}

void decrementer(shared(int) * value) {
    foreach (i; 0 .. count) {
        int currentValue;

        do {
            currentValue = *value;
        } while (!cas(value, currentValue, currentValue - 1));
    }
}
D

上記の関数は、synchronizedブロックを使用せずに正しく動作する。

ほとんどの場合、core.atomicモジュールの機能は、synchronizedブロックを使用する場合よりも数倍高速だ。同期が必要な操作が1つのコードブロック未満である場合は、このモジュールの使用を検討することをお勧めする。

アトミック操作は、ロックフリーのデータ構造も実現するが、これは本書の範囲外である。

core.syncパッケージも調査することをお勧めする。このパッケージには、以下のモジュールに古典的な並行処理プリミティブが含まれている。

要約