[インデックス 16129] ファイルの概要
このコミットは、Goランタイムにおけるハッシュマップ(Hmap
)のflags
フィールドに対するデータ競合を修正するものです。並行してマップ操作が行われる際に、flags
フィールドの更新が失われる可能性があった問題を、アトミック操作(CompareAndSwap, CAS)を導入することで解決しています。これにより、マップのイテレータの初期化やリフレクションを通じたマップ操作が、より安全に並行実行できるようになります。
コミット
commit b6bfc92df363606f982517f8c9bb840ebaef9053
Author: Dmitriy Vyukov <dvyukov@google.com>
Date: Sun Apr 7 18:19:16 2013 -0700
runtime: fix race on hashmap flags field
Use atomic operations on flags field to make sure we aren't
losing a flag update during parallel map operations.
R=golang-dev, dave, r
CC=golang-dev
https://golang.org/cl/8377046
GitHub上でのコミットページへのリンク
https://github.com/golang/go/commit/b6bfc92df363606f982517f8c9bb840ebaef9053
元コミット内容
Goランタイムのハッシュマップのflags
フィールドにおける競合状態を修正します。並行するマップ操作中にフラグの更新が失われないように、flags
フィールドにアトミック操作を使用します。
変更の背景
Goのマップは、組み込み型として非常に頻繁に使用されますが、その内部実装はC言語で書かれたランタイムコードによって支えられています。マップの内部状態を管理するためにflags
フィールドが使用されており、例えばイテレータが存在するかどうか、メモリ解放の挙動など、マップの様々な特性を示すビットフラグが格納されています。
問題は、複数のゴルーチンが同時にマップを操作しようとした際に、このflags
フィールドの更新が非アトミックに行われていた点にありました。具体的には、マップのイテレータを初期化するhash_iter_init
関数や、リフレクションを通じてマップを操作するreflect·mapiterinit
関数が、flags
フィールドを直接読み込み、ビット演算でフラグを追加・削除し、その結果を書き戻していました。
このような「読み込み-変更-書き込み」のシーケンスは、並行処理環境下では典型的なデータ競合(Race Condition)の温床となります。
例えば、ゴルーチンAがflags
を読み込み、あるフラグを立てようとしている間に、ゴルーチンBも同じflags
を読み込み、別のフラグを立てようとします。もしゴルーチンAが先に書き戻し、その後にゴルーチンBが書き戻すと、ゴルーチンAが立てたフラグはゴルーチンBの書き込みによって上書きされ、失われてしまう可能性があります。
このコミットは、このようなflags
フィールドの更新の喪失を防ぎ、マップの並行操作における堅牢性を向上させることを目的としています。
前提知識の解説
Goのマップの内部構造 (Hmap
, Bucket
)
Goのマップは、内部的にはハッシュテーブルとして実装されており、runtime
パッケージ内のC言語コードで定義されています。
Hmap
: マップ全体のメタデータを保持する構造体です。マップの要素数(count
)、バケットの数(B
)、キーや値のサイズ、そしてマップの状態を示すflags
フィールドなどが含まれます。Bucket
: 実際のキーと値のペアを格納する配列です。ハッシュ値に基づいて、キーは適切なバケットに配置されます。
Goの並行処理とGOMAXPROCS
Goはゴルーチンとチャネルによって強力な並行処理をサポートしています。複数のゴルーチンが同時に実行されることで、プログラムのパフォーマンスを向上させることができます。
GOMAXPROCS
: Goランタイムが同時に実行できるOSスレッドの最大数を制御する環境変数、またはruntime.GOMAXPROCS
関数です。デフォルトではCPUの論理コア数に設定されます。GOMAXPROCS
が1より大きい場合、複数のゴルーチンが真に並行して実行される可能性があり、データ競合のリスクが高まります。
データ競合 (Race Condition)
データ競合とは、複数のゴルーチン(またはスレッド)が共有データに同時にアクセスし、少なくとも1つのアクセスが書き込みであり、かつそれらのアクセスが同期されていない場合に発生するバグです。結果として、プログラムの実行結果が非決定論的になり、予測不能な動作を引き起こす可能性があります。
アトミック操作 (CompareAndSwap - CAS)
アトミック操作は、複数の操作(読み込み、変更、書き込みなど)が不可分な単一の操作として実行されることを保証する低レベルのプリミティブです。これにより、他のゴルーチンがその操作の途中で共有データにアクセスすることを防ぎ、データ競合を回避できます。
- CompareAndSwap (CAS): 共有メモリ位置の現在の値が期待される値と一致する場合にのみ、そのメモリ位置の値を新しい値に更新するアトミック操作です。もし現在の値が期待される値と異なる場合(つまり、他のゴルーチンによって値が変更された場合)、CASは失敗し、更新は行われません。CASは通常、ループ内で使用され、操作が成功するまでリトライされます。
Goのreflect
パッケージ
reflect
パッケージは、実行時にプログラムの構造(型、変数、関数など)を検査・操作するための機能を提供します。これにより、Goの静的型付けの制約を超えて、動的なプログラミングが可能になります。マップに対しても、reflect.ValueOf(m).MapKeys()
やmv.MapIndex(k)
のように、リフレクションを通じてキーの列挙や値の取得を行うことができます。リフレクションを通じた操作も、内部的にはランタイムのCコードを呼び出すため、ランタイムのデータ競合の影響を受けます。
技術的詳細
このコミットの主要な変更点は、Hmap
構造体のflags
フィールドの型変更と、そのフィールドへのアクセスにアトミック操作runtime·cas
を導入したことです。
-
Hmap
構造体のflags
フィールドの型変更と位置変更:- 変更前:
uint8 flags;
- 変更後:
uint32 flags;
flags
フィールドの型がuint8
(8ビット)からuint32
(32ビット)に変更されました。これにより、より多くのフラグを格納できるようになります。また、構造体内の位置もcount
フィールドの直後に移動しています。これは、メモリのアライメントやキャッシュ効率を考慮した変更である可能性がありますが、主な目的はアトミック操作の対象となるフィールドのサイズを標準的なワードサイズに合わせることです。runtime·cas
のようなアトミック操作は、通常、特定のワードサイズ(例: 32ビット、64ビット)のデータに対して最適化されています。
- 変更前:
-
hash_iter_init
関数におけるアトミック操作の導入:- この関数は、Goのマップをイテレートするためのイテレータを初期化する際に呼び出されます。イテレータが存在することを示す
Iterator
およびOldIterator
フラグをh->flags
に設定する必要があります。 - 変更前は単純なビットOR演算
h->flags |= Iterator | OldIterator;
でした。 - 変更後、この操作はCASループに置き換えられました。
このループは、for(;;) { old = h->flags; if((old&(Iterator|OldIterator)) == (Iterator|OldIterator)) break; // 既にフラグが立っていれば終了 if(runtime·cas(&h->flags, old, old|Iterator|OldIterator)) break; // CAS成功で終了 }
h->flags
の現在の値(old
)を読み込み、それに新しいフラグを追加した値(old|Iterator|OldIterator
)を計算します。そして、runtime·cas(&h->flags, old, new)
を呼び出し、h->flags
がold
と一致する場合にのみnew
に更新します。もしCASが失敗した場合(つまり、他のゴルーチンがh->flags
をold
から変更した場合)、ループは続行され、最新のh->flags
を読み込み直して再試行します。これにより、並行する更新が失われることなく、確実にフラグが設定されます。
- この関数は、Goのマップをイテレートするためのイテレータを初期化する際に呼び出されます。イテレータが存在することを示す
-
reflect·mapiterinit
関数におけるアトミック操作の導入:- この関数は、リフレクションを通じてマップのイテレータを初期化する際に呼び出されます。キーのサイズに基づいて、
CanFreeKey
またはCanFreeBucket
フラグをクリアする必要があります。 - 変更前は単純なビットAND演算と代入
h->flags = flags;
でした。 - 変更後、こちらもCASループに置き換えられました。
for(;;) { old = h->flags; if(old & IndirectKey) new = old & ~CanFreeKey; else new = old & ~CanFreeBucket; if(new == old) // 変更が不要であれば終了 break; if(runtime·cas(&h->flags, old, new)) // CAS成功で終了 break; }
hash_iter_init
と同様に、h->flags
の現在の値old
を読み込み、必要なフラグをクリアした新しい値new
を計算します。new == old
の場合は変更が不要なのでループを抜けます。そうでなければ、runtime·cas
を使ってアトミックに更新を試み、成功するまでリトライします。
- この関数は、リフレクションを通じてマップのイテレータを初期化する際に呼び出されます。キーのサイズに基づいて、
-
テストケースの追加と修正 (
src/pkg/runtime/map_test.go
):- 既存の
TestConcurrentReadsAfterGrowth
関数がtestConcurrentReadsAfterGrowth
にリファクタリングされ、useReflect
という新しい引数が追加されました。 TestConcurrentReadsAfterGrowthReflect
という新しいテスト関数が追加され、testConcurrentReadsAfterGrowth(t, true)
を呼び出すことで、リフレクションを使ったマップの並行読み込みを明示的にテストするようになりました。- テスト内の
GOMAXPROCS
の設定ロジックがos.Getenv("GOMAXPROCS") == ""
からruntime.GOMAXPROCS(-1) == 1
に変更されました。これは、テストが単一CPU環境で実行されている場合に、一時的にGOMAXPROCS
を16に設定して並行性を強制するための、より堅牢な方法です。 useReflect
がtrue
の場合に、reflect.ValueOf(m).MapKeys()
とmv.MapIndex(k)
を使ってマップをイテレートする新しいゴルーチンが追加されました。これにより、リフレクション経由でのマップ操作が並行環境下で正しく動作するかどうかが検証されます。
- 既存の
これらの変更により、Goランタイムのマップ実装におけるflags
フィールドへの並行アクセスが安全になり、データ競合が解消されました。
コアとなるコードの変更箇所
src/pkg/runtime/hashmap.c
--- a/src/pkg/runtime/hashmap.c
+++ b/src/pkg/runtime/hashmap.c
@@ -95,8 +95,8 @@ struct Bucket
struct Hmap
{
uintgo count; // # live cells == size of map. Must be first (used by len() builtin)
+ uint32 flags;
uint8 B; // log_2 of # of buckets (can hold up to LOAD * 2^B items)
- uint8 flags;
uint8 keysize; // key size in bytes
uint8 valuesize; // value size in bytes
uint16 bucketsize; // bucket size in bytes
@@ -767,6 +767,8 @@ struct hash_iter
static void
hash_iter_init(MapType *t, Hmap *h, struct hash_iter *it)
{
+ uint32 old;
+
if(sizeof(struct hash_iter) / sizeof(uintptr) != 11) {
runtime·throw("hash_iter size incorrect"); // see ../../cmd/gc/range.c
}
@@ -783,7 +785,14 @@ hash_iter_init(MapType *t, Hmap *h, struct hash_iter *it)
it->bptr = nil;
// Remember we have an iterator.
- h->flags |= Iterator | OldIterator; // careful: see issue 5120.
+ // Can run concurrently with another hash_iter_init() and with reflect·mapiterinit().
+ for(;;) {
+ old = h->flags;
+ if((old&(Iterator|OldIterator)) == (Iterator|OldIterator))
+ break;
+ if(runtime·cas(&h->flags, old, old|Iterator|OldIterator))
+ break;
+ }
if(h->buckets == nil) {
// Empty map. Force next hash_next to exit without
@@ -1370,18 +1379,24 @@ runtime·mapiterinit(MapType *t, Hmap *h, struct hash_iter *it)\n void\n reflect·mapiterinit(MapType *t, Hmap *h, struct hash_iter *it)\n {\n-\tuint8 flags;\n+\tuint32 old, new;\n \n \tif(h != nil && t->key->size > sizeof(void*)) {\n \t\t// reflect·mapiterkey returns pointers to key data,\n \t\t// and reflect holds them, so we cannot free key data\n \t\t// eagerly anymore.\n-\t\tflags = h->flags;\n-\t\tif(flags & IndirectKey)\n-\t\t\tflags &= ~CanFreeKey;\n-\t\telse\n-\t\t\tflags &= ~CanFreeBucket;\n-\t\th->flags = flags;\n+\t\t// Can run concurrently with another reflect·mapiterinit() and with hash_iter_init().\n+\t\tfor(;;) {\n+\t\t\told = h->flags;\n+\t\t\tif(old & IndirectKey)\n+\t\t\t\tnew = old & ~CanFreeKey;\n+\t\t\telse\t\t\t\tnew = old & ~CanFreeBucket;\n+\t\t\tif(new == old)\n+\t\t\t\tbreak;\n+\t\t\tif(runtime·cas(&h->flags, old, new))\n+\t\t\t\tbreak;\n+\t\t}\n \t}\n \n \tit = runtime·mal(sizeof *it);\n```
### `src/pkg/runtime/map_test.go`
```diff
--- a/src/pkg/runtime/map_test.go
+++ b/src/pkg/runtime/map_test.go
@@ -7,7 +7,7 @@ package runtime_test
import (
"fmt"
"math"
- "os"
+ "reflect"
"runtime"
"sort"
"strings"
@@ -234,8 +234,8 @@ func TestIterGrowWithGC(t *testing.T) {
}\n}\n\n-func TestConcurrentReadsAfterGrowth(t *testing.T) {\n- if os.Getenv(\"GOMAXPROCS\") == \"\" {\n+func testConcurrentReadsAfterGrowth(t *testing.T, useReflect bool) {\n+ if runtime.GOMAXPROCS(-1) == 1 {\n defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(16))\n }\n numLoop := 10\n@@ -262,12 +262,31 @@ func TestConcurrentReadsAfterGrowth(t *testing.T) {\n _ = m[key]\n }\n }()\n+ if useReflect {\n+ wg.Add(1)\n+ go func() {\n+ defer wg.Done()\n+ mv := reflect.ValueOf(m)\n+ keys := mv.MapKeys()\n+ for _, k := range keys {\n+ mv.MapIndex(k)\n+ }\n+ }()\n+ }\n }\n wg.Wait()\n }\n }\n}\n\n+func TestConcurrentReadsAfterGrowth(t *testing.T) {\n+\ttestConcurrentReadsAfterGrowth(t, false)\n+}\n+\n+func TestConcurrentReadsAfterGrowthReflect(t *testing.T) {\n+\ttestConcurrentReadsAfterGrowth(t, true)\n+}\n+\n func TestBigItems(t *testing.T) {\n var key [256]string\n for i := 0; i < 256; i++ {\n```
## コアとなるコードの解説
### `src/pkg/runtime/hashmap.c`
* **`struct Hmap`の`flags`フィールド**:
* `uint8 flags;` から `uint32 flags;` へと型が変更されました。これにより、`flags`フィールドが32ビット幅になり、より多くのビットフラグを格納できるようになるとともに、32ビットのアトミック操作が効率的に行えるようになります。
* `uintgo count;` の直後に移動しました。これは、構造体のアライメントやキャッシュラインの最適化に寄与する可能性があります。
* **`hash_iter_init`関数**:
* マップのイテレータを初期化する際に、`h->flags`に`Iterator`と`OldIterator`フラグを設定する部分が変更されました。
* 変更前は単純な`h->flags |= Iterator | OldIterator;`という非アトミックな操作でした。
* 変更後は、`for(;;)`ループ内で`runtime·cas`(CompareAndSwap)関数を使用するようになりました。
1. まず、`h->flags`の現在の値`old`を読み取ります。
2. もし`old`に既に`Iterator`と`OldIterator`の両方のフラグが立っていれば、それ以上変更する必要はないためループを抜けます。
3. そうでなければ、`old`に`Iterator`と`OldIterator`フラグを立てた新しい値`new`を計算します。
4. `runtime·cas(&h->flags, old, new)`を呼び出します。これは、`h->flags`が`old`と等しい場合にのみ`h->flags`を`new`に更新し、成功した場合は`true`を返します。
5. CASが成功すればループを抜けます。失敗した場合(他のゴルーチンが`h->flags`を`old`から変更したため)、ループは続行され、最新の`h->flags`を読み込み直して再試行します。
* このCASループにより、複数のゴルーチンが同時にイテレータを初期化しようとしても、`flags`フィールドの更新がアトミックに行われ、フラグの喪失を防ぐことができます。
* **`reflect·mapiterinit`関数**:
* リフレクションを通じてマップのイテレータを初期化する際に、`h->flags`から`CanFreeKey`または`CanFreeBucket`フラグをクリアする部分が変更されました。
* 変更前は、`flags`変数を介して非アトミックに操作し、最後に`h->flags = flags;`で書き戻していました。
* 変更後は、`hash_iter_init`と同様に`for(;;)`ループ内で`runtime·cas`を使用するようになりました。
1. `h->flags`の現在の値`old`を読み取ります。
2. `old`の値に基づいて、`CanFreeKey`または`CanFreeBucket`フラグをクリアした新しい値`new`を計算します。
3. もし`new`が`old`と等しい場合(つまり、フラグの変更が不要な場合)、ループを抜けます。
4. そうでなければ、`runtime·cas(&h->flags, old, new)`を呼び出し、アトミックに更新を試みます。
5. CASが成功すればループを抜けます。失敗した場合は再試行します。
* これにより、リフレクション経由でのマップ操作も並行環境下で安全に行われるようになります。
### `src/pkg/runtime/map_test.go`
* **`reflect`パッケージのインポート**:
* 新しく追加されたリフレクション関連のテストのために、`reflect`パッケージがインポートされました。
* **`TestConcurrentReadsAfterGrowth`のリファクタリングと`testConcurrentReadsAfterGrowth`の導入**:
* 元の`TestConcurrentReadsAfterGrowth`関数が`testConcurrentReadsAfterGrowth`というヘルパー関数にリファクタリングされ、`useReflect bool`という引数が追加されました。
* これにより、通常のマップアクセスとリフレクションを使ったマップアクセスの両方を同じテストロジックで検証できるようになりました。
* **`GOMAXPROCS`の設定ロジックの改善**:
* テストの冒頭で、`if os.Getenv("GOMAXPROCS") == ""`というチェックが`if runtime.GOMAXPROCS(-1) == 1`に変更されました。
* `runtime.GOMAXPROCS(-1)`は現在の`GOMAXPROCS`の値を変更せずに取得します。もしこれが1(つまり単一CPUモード)であれば、`defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(16))`によって、テスト実行中は一時的に`GOMAXPROCS`を16に設定し、テスト終了後に元の値に戻します。これにより、テストが確実に並行環境下で実行され、競合状態が再現されやすくなります。
* **リフレクションを使った並行読み込みのテスト追加**:
* `testConcurrentReadsAfterGrowth`関数内で、`useReflect`が`true`の場合に実行される新しいゴルーチンが追加されました。
* このゴルーチンは、`reflect.ValueOf(m).MapKeys()`でマップのキーを取得し、`mv.MapIndex(k)`で各キーに対応する値を読み込みます。
* これにより、リフレクションAPIを介したマップの並行読み込みが、`flags`フィールドのアトミック操作の恩恵を受けて正しく動作するかどうかが検証されます。
* **新しいテスト関数 `TestConcurrentReadsAfterGrowthReflect`**:
* この関数は、`testConcurrentReadsAfterGrowth(t, true)`を呼び出すことで、リフレクションを使ったマップの並行読み込みテストを明示的に実行します。
これらの変更は、Goのマップが内部的にどのように並行性を扱っているか、そして低レベルなランタイムコードでデータ競合をどのように解決しているかを示す良い例です。
## 関連リンク
* Go issue 5120 (関連する可能性のあるGoのissue): [https://github.com/golang/go/issues/5120](https://github.com/golang/go/issues/5120) (コミットメッセージに`// careful: see issue 5120.`とあるため)
* Goの`sync/atomic`パッケージのドキュメント: [https://pkg.go.dev/sync/atomic](https://pkg.go.dev/sync/atomic) (Go言語におけるアトミック操作の概念を理解する上で役立ちます)
## 参考にした情報源リンク
* Goのソースコード: `src/runtime/map.go`, `src/runtime/hashmap.go` (現在のGoのマップ実装はGoで書かれていますが、このコミット当時の実装はC言語でした。概念は共通しています。)
* Goの`reflect`パッケージのドキュメント: [https://pkg.go.dev/reflect](https://pkg.go.dev/reflect)
* Goの`runtime`パッケージのドキュメント: [https://pkg.go.dev/runtime](https://pkg.go.dev/runtime)
* データ競合とアトミック操作に関する一般的な情報源 (例: Wikipedia, 並行プログラミングの教科書など)
* Goのマップの内部実装に関するブログ記事や解説 (例: "Go's map implementation details")