[インデックス 16525] ファイルの概要
このコミットは、Go言語の標準ライブラリsync
パッケージ内のWaitGroup
における競合検出(race instrumentation)を改善することを目的としています。具体的には、WaitGroup
のAdd
メソッドとWait
メソッドの同期メカニズムを調整し、一般的なWaitGroup
の誤用パターン(例: Add()
が追加のゴルーチン内で呼び出される、複数のウェイターによるWaitGroup
の不適切な再利用)をより効果的に検出できるようにしています。
コミット
commit 0f4897ae1a99d5c9de78c33c7b0963e71960c678
Author: Dmitriy Vyukov <dvyukov@google.com>
Date: Mon Jun 10 22:38:58 2013 +0400
sync: improve race instrumentation of WaitGroup
Do not synchronize Add(1) with Wait().
Imitate read on first Add(1) and write on Wait(),
it allows to catch common misuses of WaitGroup:
- Add() called in the additional goroutine itself
- incorrect reuse of WaitGroup with multiple waiters
R=golang-dev, iant
CC=golang-dev
https://golang.org/cl/10093044
GitHub上でのコミットページへのリンク
https://github.com/golang/go/commit/0f4897ae1a99d5c9de78c33c7b0963e71960c678
元コミット内容
sync: improve race instrumentation of WaitGroup
Do not synchronize Add(1) with Wait().
Imitate read on first Add(1) and write on Wait(),
it allows to catch common misuses of WaitGroup:
- Add() called in the additional goroutine itself
- incorrect reuse of WaitGroup with multiple waiters
変更の背景
Goのsync.WaitGroup
は、複数のゴルーチンが完了するのを待つための同期プリミティブです。しかし、その使用方法にはいくつかの一般的な落とし穴があり、特に競合状態(race condition)を引き起こしやすいパターンが存在します。Goのレース検出器(Race Detector)は、これらの競合状態を特定する強力なツールですが、WaitGroup
の特定の誤用パターンに対しては、その検出能力が十分ではありませんでした。
このコミットの背景には、以下の2つの主要なWaitGroup
の誤用パターンをレース検出器でより確実に検出できるようにするという課題がありました。
Add()
が追加のゴルーチン内で呼び出されるケース:WaitGroup.Add(delta)
は通常、ゴルーチンを起動する前に呼び出され、そのゴルーチンが完了したときにDone()
が呼び出されることを期待します。しかし、誤ってAdd()
が起動されたゴルーチン内部で呼び出されると、Wait()
がAdd()
の呼び出しを待たずに先に完了してしまう可能性があり、競合状態やデッドロックにつながることがあります。- 複数のウェイターによる
WaitGroup
の不適切な再利用:WaitGroup
は、一度カウントがゼロになりWait()
が完了した後、再利用することができます。しかし、複数のゴルーチンが同時にWait()
を呼び出している状態でWaitGroup
を再利用しようとすると、予期せぬ競合状態が発生する可能性があります。
これらの誤用パターンは、プログラムの予期せぬ動作やクラッシュを引き起こす可能性があり、開発者がデバッグするのが困難な問題でした。このコミットは、レース検出器がこれらのシナリオを「競合」として報告するようにWaitGroup
の内部実装を調整することで、開発者がこれらの問題を早期に発見し、修正できるようにすることを目的としています。
前提知識の解説
このコミットの変更内容を理解するためには、以下の概念について理解しておく必要があります。
1. Goの並行性モデルとゴルーチン (Goroutines)
Go言語は、軽量な実行スレッドであるゴルーチンを用いて並行処理をサポートします。ゴルーチンはOSのスレッドよりもはるかに軽量で、数千、数万のゴルーチンを同時に実行することが可能です。
2. sync.WaitGroup
sync.WaitGroup
は、複数のゴルーチンが完了するまでメインゴルーチンが待機するための同期プリミティブです。以下の3つのメソッドを持ちます。
Add(delta int)
:WaitGroup
のカウンタにdelta
を加算します。通常、新しいゴルーチンを起動する前に呼び出され、待機するゴルーチンの数を設定します。Done()
:WaitGroup
のカウンタを1減らします。通常、ゴルーチンがその処理を完了したときに呼び出されます。これはAdd(-1)
のショートカットです。Wait()
:WaitGroup
のカウンタがゼロになるまでブロックします。
典型的な使用パターンは以下の通りです。
var wg sync.WaitGroup
for i := 0; i < N; i++ {
wg.Add(1) // ゴルーチン起動前にカウンタを増やす
go func() {
defer wg.Done() // ゴルーチン終了時にカウンタを減らす
// ... 処理 ...
}()
}
wg.Wait() // 全てのゴルーチンが完了するまで待つ
3. Goのレース検出器 (Race Detector)
Goのレース検出器は、並行プログラムにおける競合状態(race condition)を検出するためのツールです。競合状態とは、複数のゴルーチンが同時に同じメモリ位置にアクセスし、少なくとも1つのアクセスが書き込みである場合に発生するバグです。レース検出器は、プログラムの実行中にメモリアクセスを監視し、競合パターンを検出すると警告を出力します。
レース検出器は、go run -race
、go build -race
、go test -race
などのコマンドで有効にできます。内部的には、メモリへの読み書き操作をフックし、それらの操作の順序とタイミングを追跡することで競合を検出します。
4. メモリモデルと同期 (Memory Model and Synchronization)
Goのメモリモデルは、複数のゴルーチンが共有メモリにアクセスする際の動作を定義します。競合状態は、メモリモデルの保証に違反するアクセスパターンです。sync
パッケージのプリミティブ(Mutex
, WaitGroup
, Channel
など)は、これらのメモリモデルの保証を確立し、競合状態を回避するための同期メカニズムを提供します。
レース検出器は、これらの同期プリミティブが正しく使用されているかを検証するのに役立ちます。同期プリミティブは、特定のメモリ操作間に「happens-before」関係を確立し、これにより競合状態を回避します。
5. raceenabled
フラグとruntime/race
パッケージ
Goのビルドシステムには、レース検出器を有効にするためのraceenabled
という内部フラグがあります。このフラグが有効な場合、runtime/race
パッケージ内の関数が呼び出され、メモリアクセスがレース検出器に報告されます。
runtime.RaceRead(addr unsafe.Pointer)
: 指定されたメモリアドレスaddr
への読み込み操作をレース検出器に報告します。runtime.RaceWrite(addr unsafe.Pointer)
: 指定されたメモリアドレスaddr
への書き込み操作をレース検出器に報告します。runtime.RaceReleaseMerge(addr unsafe.Pointer)
: 特定の同期操作(例: ロックの解放)をレース検出器に報告し、メモリの同期ポイントを確立します。
これらの関数は、通常、開発者が直接呼び出すものではなく、sync
パッケージのような標準ライブラリの内部で、レース検出器との連携のために使用されます。
技術的詳細
このコミットの核心は、sync.WaitGroup
のAdd
メソッドとWait
メソッドの内部で、レース検出器に対する「擬似的な」メモリ読み書き操作を導入することにあります。これにより、WaitGroup
の誤用が実際のデータ競合としてレース検出器に認識されるようになります。
変更のポイントは以下の通りです。
-
WaitGroup.Add(delta int)
の変更:raceenabled
が有効な場合、delta
が負の値(つまりDone()
によるデクリメント)の場合にraceReleaseMerge(unsafe.Pointer(wg))
を呼び出します。これは、Done()
がWait()
と同期するためのリリース操作を模倣します。delta
が正の値(つまりインクリメント)の場合、かつwg.counter
がdelta
と等しい(つまり、WaitGroup
がゼロから初めてインクリメントされる)場合に、raceRead(unsafe.Pointer(&wg.sema))
を呼び出します。これは、Add(1)
がWaitGroup
の内部状態に対する「読み込み」操作であるとレース検出器に認識させます。この「読み込み」は、後述のWait()
の「書き込み」と競合する可能性があります。
-
WaitGroup.Wait()
の変更:raceenabled
が有効な場合、Wait()
が呼び出され、かつそれが最初のウェイターである場合(atomic.AddInt32(&wg.waiters, 1)
の結果が1の場合)に、raceWrite(unsafe.Pointer(&wg.sema))
を呼び出します。これは、Wait()
がWaitGroup
の内部状態に対する「書き込み」操作であるとレース検出器に認識させます。
なぜこのような擬似的な読み書きが必要なのか?
Goのレース検出器は、実際の共有メモリへの読み書き操作を監視することで競合を検出します。しかし、WaitGroup
の誤用は、必ずしも直接的な共有メモリへの競合アクセスとして現れるわけではありません。例えば、Add()
がゴルーチン内で遅れて呼び出され、そのゴルーチンがDone()
を呼び出す前にWait()
が完了してしまうようなケースでは、WaitGroup
のカウンタ自体へのアクセスはアトミックに行われているため、レース検出器は通常の競合として報告しません。
このコミットは、WaitGroup
の内部セマフォ(wg.sema
)のアドレスを「仮想的な共有メモリ」として利用し、Add()
の特定のケースを「読み込み」、Wait()
の特定のケースを「書き込み」としてレース検出器に報告することで、これらの論理的な競合を物理的なメモリ競合として擬似的に表現しています。
Add()
とWait()
の同期の模倣:Add(1)
がWaitGroup
の初期状態(カウンタが0)から初めてインクリメントされる際に「読み込み」を、そしてWait()
が最初のウェイターとして待機を開始する際に「書き込み」を模倣することで、これら2つの操作が適切な順序で発生しない場合にレース検出器が警告を発するようにします。これにより、Add()
がゴルーチン内で遅れて呼び出されるような誤用パターンが検出されます。WaitGroup
の再利用における競合の検出: 複数のWait()
呼び出しがある場合、最初のWait()
のみがraceWrite
を呼び出します。これは、複数のWait()
が同時に実行される場合に、それらが互いに競合しないようにするためです。しかし、WaitGroup
が不適切に再利用され、前のWait()
が完了する前に新しいAdd()
が呼び出されるようなシナリオでは、この擬似的な読み書きが競合として検出される可能性があります。
src/pkg/sync/race.go
と src/pkg/sync/race0.go
の変更
このコミットでは、レース検出器が有効な場合(race.go
)と無効な場合(race0.go
)の両方で、raceRead
とraceWrite
というヘルパー関数が追加されています。これらは、それぞれruntime.RaceRead
とruntime.RaceWrite
のラッパーであり、raceenabled
が有効な場合にのみ実際のランタイム関数を呼び出します。これにより、sync
パッケージのコードがraceenabled
の状態に依存せずに記述できるようになります。
src/pkg/runtime/race/testdata/waitgroup_test.go
の変更
このファイルには、WaitGroup
のレース検出に関するテストケースが含まれています。このコミットでは、新しいテストケースが追加され、既存のテストケースが修正されています。特に注目すべきは、TestRaceFailingWaitGroupWrongAdd
がTestRaceWaitGroupWrongAdd
にリネームされ、time.Sleep
が追加されて、Add()
がゴルーチン内で遅れて呼び出されるシナリオをより確実に再現し、レース検出器がそれを検出できるようにしている点です。また、TestNoRaceWaitGroupMultipleWait3
, TestNoRaceWaitGroupReuse
, TestNoRaceWaitGroupReuse2
, TestRaceWaitGroupReuse
, TestNoRaceWaitGroupConcurrentAdd
といった新しいテストが追加され、WaitGroup
の様々な使用パターン(特に再利用と複数のウェイター)におけるレース検出の挙動が検証されています。
コアとなるコードの変更箇所
src/pkg/sync/waitgroup.go
--- a/src/pkg/sync/waitgroup.go
+++ b/src/pkg/sync/waitgroup.go
@@ -43,12 +43,23 @@ type WaitGroup struct {
// other event to be waited for. See the WaitGroup example.
func (wg *WaitGroup) Add(delta int) {
if raceenabled {
- _ = wg.m.state
- raceReleaseMerge(unsafe.Pointer(wg))
+ _ = wg.m.state // trigger nil deref early
+ if delta < 0 {
+ // Synchronize decrements with Wait.
+ raceReleaseMerge(unsafe.Pointer(wg))
+ }
raceDisable()
defer raceEnable()
}
v := atomic.AddInt32(&wg.counter, int32(delta))
+ if raceenabled {
+ if delta > 0 && v == int32(delta) {
+ // The first increment must be synchronized with Wait.
+ // Need to model this as a read, because there can be
+ // several concurrent wg.counter transitions from 0.
+ raceRead(unsafe.Pointer(&wg.sema))
+ }
+ }
if v < 0 {
panic("sync: negative WaitGroup counter")
}
@@ -72,7 +83,14 @@ func (wg *WaitGroup) Done() {
// Wait blocks until the WaitGroup counter is zero.
func (wg *WaitGroup) Wait() {
if raceenabled {
- _ = wg.m.state
+ _ = wg.m.state // trigger nil deref early
raceDisable()
}
if atomic.LoadInt32(&wg.counter) == 0 {
return
}
wg.m.Lock()
- atomic.AddInt32(&wg.waiters, 1)
+ w := atomic.AddInt32(&wg.waiters, 1)
+ if raceenabled && w == 1 {
+ // Wait's must be synchronized with the first Add.
+ // Need to model this is as a write to race with the read in Add.
+ // As the consequence, can do the write only for the first waiter,
+ // otherwise concurrent Wait's will race with each other.
+ raceWrite(unsafe.Pointer(&wg.sema))
+ }
// This code is racing with the unlocked path in Add above.
// The code above modifies counter and then reads waiters.
// We must modify waiters and then read counter (the opposite order)
src/pkg/sync/race.go
--- a/src/pkg/sync/race.go
+++ b/src/pkg/sync/race.go
@@ -32,3 +32,11 @@ func raceDisable() {
func raceEnable() {
runtime.RaceEnable()
}
+
+func raceRead(addr unsafe.Pointer) {
+ runtime.RaceRead(addr)
+}
+
+func raceWrite(addr unsafe.Pointer) {
+ runtime.RaceWrite(addr)
+}
src/pkg/sync/race0.go
--- a/src/pkg/sync/race0.go
+++ b/src/pkg/sync/race0.go
@@ -26,3 +26,9 @@ func raceDisable() {
func raceEnable() {
}
+
+func raceRead(addr unsafe.Pointer) {
+}
+
+func raceWrite(addr unsafe.Pointer) {
+}
コアとなるコードの解説
sync/waitgroup.go
の変更点
-
WaitGroup.Add(delta int)
メソッド:- 既存の
raceReleaseMerge(unsafe.Pointer(wg))
の呼び出しが、delta < 0
(つまりDone()
によるデクリメント)の場合に限定されました。これは、デクリメント操作がWait()
と同期する必要があるためです。 - 新しく追加されたブロックでは、
raceenabled
が有効で、かつdelta > 0
(インクリメント)であり、かつv == int32(delta)
(WaitGroup
のカウンタがゼロから初めてインクリメントされる場合)にraceRead(unsafe.Pointer(&wg.sema))
が呼び出されます。wg.sema
はWaitGroup
内部のセマフォであり、ここではレース検出器に「仮想的な共有メモリ」として扱わせるためのアドレスとして利用されます。- 「最初のインクリメント」を「読み込み」としてモデル化することで、
Add()
がゴルーチン内で遅れて呼び出され、Wait()
が先に実行されてしまうような誤用パターンを検出するためのフックとなります。
- 既存の
-
WaitGroup.Wait()
メソッド:atomic.AddInt32(&wg.waiters, 1)
でウェイターの数をインクリメントした後、その結果がw == 1
(つまり、このWait()
呼び出しが最初のウェイターである場合)にraceWrite(unsafe.Pointer(&wg.sema))
が呼び出されます。Wait()
を「書き込み」としてモデル化することで、Add()
の「読み込み」と競合する可能性を生み出します。- 「最初のウェイターのみ」が
raceWrite
を呼び出すのは、複数のWait()
が同時に実行される場合に、それら自身が競合として報告されないようにするためです。WaitGroup
の再利用時に、前のWait()
が完了する前に新しいAdd()
が呼び出されるようなシナリーで、この擬似的な読み書きが競合として検出されることを意図しています。
sync/race.go
と sync/race0.go
の変更点
sync/race.go
(レース検出器が有効なビルド用)には、runtime.RaceRead(addr unsafe.Pointer)
とruntime.RaceWrite(addr unsafe.Pointer)
をそれぞれ呼び出すraceRead
とraceWrite
関数が追加されました。sync/race0.go
(レース検出器が無効なビルド用)には、何もしない(no-op)raceRead
とraceWrite
関数が追加されました。
これらの変更により、sync/waitgroup.go
のコードは、raceenabled
フラグの状態を直接チェックすることなく、常にraceRead
とraceWrite
を呼び出すことができるようになり、コードの簡潔性が保たれています。実際のレース検出器への報告は、ビルド時にリンクされるrace.go
またはrace0.go
のどちらが使用されるかによって制御されます。
関連リンク
- Go Race Detector: https://go.dev/blog/race-detector
sync.WaitGroup
documentation: https://pkg.go.dev/sync#WaitGroup- Go Memory Model: https://go.dev/ref/mem
参考にした情報源リンク
- Goの公式ドキュメント
- Goのソースコード(特に
src/runtime/race
ディレクトリ) - GoのIssueトラッカーやコードレビューシステム(
golang.org/cl/10093044
) - Goのレース検出器に関するブログ記事や解説記事