Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

[インデックス 18545] ファイルの概要

このコミットは、Go言語のtestingパッケージにb.RunParallel関数を追加し、並行ベンチマークの記述を容易にすることを目的としています。これにより、ワーカーゴルーチンの作成、結合、効率的な作業の分散、および粒度(grain size)の自動調整といった並行ベンチマークにおける定型的な処理が抽象化されます。

コミット

commit c3922f0a63b826000e6ab46cadc344782fbae178
Author: Dmitriy Vyukov <dvyukov@google.com>
Date:   Mon Feb 17 06:29:56 2014 +0400

    testing: ease writing parallel benchmarks
    Add b.RunParallel function that captures parallel benchmark boilerplate:
    creates worker goroutines, joins worker goroutines, distributes work
    among them in an efficient way, auto-tunes grain size.
    Fixes #7090.
    
    R=bradfitz, iant, josharian, tracey.brendan, r, rsc, gobot
    CC=golang-codereviews
    https://golang.org/cl/57270043
---
 doc/go1.3.txt                     |   1 +
 src/pkg/runtime/chan_test.go      | 213 +++++++++++++-------------------------
 src/pkg/testing/benchmark.go      |  97 +++++++++++++++--
 src/pkg/testing/benchmark_test.go |  54 ++++++++++\n src/pkg/testing/testing.go        |  16 +++
 5 files changed, 236 insertions(+), 145 deletions(-)

diff --git a/doc/go1.3.txt b/doc/go1.3.txt
index d2ba78dddd..cdf241ae5a 100644
--- a/doc/go1.3.txt
+++ b/doc/go1.3.txt
@@ -4,5 +4,6 @@ misc/dist: renamed misc/makerelease (CL 39920043)
 runtime: output how long goroutines are blocked (CL 50420043)
 syscall: add NewCallbackCDecl to use for windows callbacks (CL 36180044)
 testing: diagnose buggy tests that panic(nil) (CL 55780043)
+testing: add b.RunParallel function (CL 57270043)
 misc/benchcmp has been replaced by go tool benchcmp (CL 47980043)
 cmd/go, go/build: support .m files (CL 60590044)
diff --git a/src/pkg/runtime/chan_test.go b/src/pkg/runtime/chan_test.go
index 3ee7fe465d..782176c883 100644
--- a/src/pkg/runtime/chan_test.go
+++ b/src/pkg/runtime/chan_test.go
@@ -455,146 +455,93 @@ func BenchmarkChanNonblocking(b *testing.B) {
 }
 
 func BenchmarkSelectUncontended(b *testing.B) {
-	const CallsPerSched = 1000
-	procs := runtime.GOMAXPROCS(-1)
-	N := int32(b.N / CallsPerSched)
-	c := make(chan bool, procs)
-	for p := 0; p < procs; p++ {
-		go func() {
-			myc1 := make(chan int, 1)
-			myc2 := make(chan int, 1)
-			myc1 <- 0
-			for atomic.AddInt32(&N, -1) >= 0 {
-				for g := 0; g < CallsPerSched; g++ {
-					select {
-					case <-myc1:
-						myc2 <- 0
-					case <-myc2:
-						myc1 <- 0
-					}
-				}
-			}
-			c <- true
-		}()
-	}
-	for p := 0; p < procs; p++ {
-		<-c
-	}
+	b.RunParallel(func(pb *testing.PB) {
+		myc1 := make(chan int, 1)
+		myc2 := make(chan int, 1)
+		myc1 <- 0
+		for pb.Next() {
+			select {
+			case <-myc1:
+				myc2 <- 0
+			case <-myc2:
+				myc1 <- 0
+			}
+		}
+	})
 }
 
 func BenchmarkSelectContended(b *testing.B) {
-	const CallsPerSched = 1000
-	procs := runtime.GOMAXPROCS(-1)
-	N := int32(b.N / CallsPerSched)
-	c := make(chan bool, procs)
+	procs := runtime.GOMAXPROCS(0)
 	myc1 := make(chan int, procs)
 	myc2 := make(chan int, procs)
-	for p := 0; p < procs; p++ {
+	b.RunParallel(func(pb *testing.PB) {
 		myc1 <- 0
-		go func() {
-			for atomic.AddInt32(&N, -1) >= 0 {
-				for g := 0; g < CallsPerSched; g++ {
-					select {
-					case <-myc1:
-						myc2 <- 0
-					case <-myc2:
-						myc1 <- 0
-					}
-				}
-			}
-			c <- true
-		}()
-	}
-	for p := 0; p < procs; p++ {
-		<-c
-	}
+		for pb.Next() {
+			select {
+			case <-myc1:
+				myc2 <- 0
+			case <-myc2:
+				myc1 <- 0
+			}
+		}
+	})
 }
 
 func BenchmarkSelectNonblock(b *testing.B) {
-	const CallsPerSched = 1000
-	procs := runtime.GOMAXPROCS(-1)
-	N := int32(b.N / CallsPerSched)
-	c := make(chan bool, procs)
-	for p := 0; p < procs; p++ {
-		go func() {
-			myc1 := make(chan int)
-			myc2 := make(chan int)
-			myc3 := make(chan int, 1)
-			myc4 := make(chan int, 1)
-			for atomic.AddInt32(&N, -1) >= 0 {
-				for g := 0; g < CallsPerSched; g++ {
-					select {
-					case <-myc1:
-					default:
-					}
-					select {
-					case myc2 <- 0:
-					default:
-					}
-					select {
-					case <-myc3:
-					default:
-					}
-					select {
-					case myc4 <- 0:
-					default:
-					}
-				}
-			}
-			c <- true
-		}()
-	}
-	for p := 0; p < procs; p++ {
-		<-c
-	}
+	b.RunParallel(func(pb *testing.PB) {
+		myc1 := make(chan int)
+		myc2 := make(chan int)
+		myc3 := make(chan int, 1)
+		myc4 := make(chan int, 1)
+		for pb.Next() {
+			select {
+			case <-myc1:
+			default:
+			}
+			select {
+			case myc2 <- 0:
+			default:
+			}
+			select {
+			case <-myc3:
+			default:
+			}
+			select {
+			case myc4 <- 0:
+			default:
+			}
+		}
+	})
 }
 
 func BenchmarkChanUncontended(b *testing.B) {
-	const CallsPerSched = 1000
-	procs := runtime.GOMAXPROCS(-1)
-	N := int32(b.N / CallsPerSched)
-	c := make(chan bool, procs)
-	for p := 0; p < procs; p++ {
-		go func() {
-			myc := make(chan int, CallsPerSched)
-			for atomic.AddInt32(&N, -1) >= 0 {
-				for g := 0; g < CallsPerSched; g++ {
-					myc <- 0
-				}
-				for g := 0; g < CallsPerSched; g++ {
-					<-myc
-				}
-			}
-			c <- true
-		}()
-	}
-	for p := 0; p < procs; p++ {
-		<-c
-	}
+	const C = 100
+	b.RunParallel(func(pb *testing.PB) {
+		myc := make(chan int, C)
+		for pb.Next() {
+			for i := 0; i < C; i++ {
+				myc <- 0
+			}
+			for i := 0; i < C; i++ {
+				<-myc
+			}
+		}
+	})
 }
 
 func BenchmarkChanContended(b *testing.B) {
-	const CallsPerSched = 1000
-	procs := runtime.GOMAXPROCS(-1)
-	N := int32(b.N / CallsPerSched)
-	c := make(chan bool, procs)
-	myc := make(chan int, procs*CallsPerSched)
-	for p := 0; p < procs; p++ {
-		go func() {
-			for atomic.AddInt32(&N, -1) >= 0 {
-				for g := 0; g < CallsPerSched; g++ {
-					myc <- 0
-				}
-				for g := 0; g < CallsPerSched; g++ {
-					<-myc
-				}
-			}
-			c <- true
-		}()
-	}
-	for p := 0; p < procs; p++ {
-		<-c
-	}
+	const C = 100
+	myc := make(chan int, C*runtime.GOMAXPROCS(0))
+	b.RunParallel(func(pb *testing.PB) {
+		for pb.Next() {
+			for i := 0; i < C; i++ {
+				myc <- 0
+			}
+			for i := 0; i < C; i++ {
+				<-myc
+			}
+		}
+	})
 }
 
 func BenchmarkChanSync(b *testing.B) {
@@ -755,25 +702,13 @@ func BenchmarkSelectProdCons(b *testing.B) {
 }
 
 func BenchmarkChanCreation(b *testing.B) {
-	const CallsPerSched = 1000
-	procs := runtime.GOMAXPROCS(-1)
-	N := int32(b.N / CallsPerSched)
-	c := make(chan bool, procs)
-	for p := 0; p < procs; p++ {
-		go func() {
-			for atomic.AddInt32(&N, -1) >= 0 {
-				for g := 0; g < CallsPerSched; g++ {
-					myc := make(chan int, 1)
-					myc <- 0
-					<-myc
-				}
-			}
-			c <- true
-		}()
-	}
-	for p := 0; p < procs; p++ {
-		<-c
-	}
+	b.RunParallel(func(pb *testing.PB) {
+		for pb.Next() {
+			myc := make(chan int, 1)
+			myc <- 0
+			<-myc
+		}
+	})
 }
 
 func BenchmarkChanSem(b *testing.B) {
diff --git a/src/pkg/testing/benchmark.go b/src/pkg/testing/benchmark.go
index cff0774179..e6f3c6d790 100644
--- a/src/pkg/testing/benchmark.go
+++ b/src/pkg/testing/benchmark.go
@@ -10,6 +10,7 @@ import (
 	"os"
 	"runtime"
 	"sync"
+	"sync/atomic"
 	"time"
 )
 
@@ -34,12 +35,15 @@ type InternalBenchmark struct {
 // timing and to specify the number of iterations to run.
 type B struct {
 	common
-	N               int
-	benchmark       InternalBenchmark
-	bytes           int64
-	timerOn         bool
-	showAllocResult bool
-	result          BenchmarkResult
+	N                int
+	previousN        int           // number of iterations in the previous run
+	previousDuration time.Duration // total duration of the previous run
+	benchmark        InternalBenchmark
+	bytes            int64
+	timerOn          bool
+	showAllocResult  bool
+	result           BenchmarkResult
+	parallelism      int // RunParallel creates parallelism*GOMAXPROCS goroutines
 	// The initial states of memStats.Mallocs and memStats.TotalAlloc.
 	startAllocs uint64
 	startBytes  uint64
@@ -114,10 +118,13 @@ func (b *B) runN(n int) {
 	// by clearing garbage from previous runs.
 	runtime.GC()
 	b.N = n
+	b.parallelism = 1
 	b.ResetTimer()
 	b.StartTimer()
 	b.benchmark.F(b)
 	b.StopTimer()
+	b.previousN = n
+	b.previousDuration = b.duration
 }
 
 func min(x, y int) int {
@@ -343,6 +350,84 @@ func (b *B) trimOutput() {
 	}
 }\n 
+// A PB is used by RunParallel for running parallel benchmarks.
+type PB struct {
+	globalN *uint64 // shared between all worker goroutines iteration counter
+	grain   uint64  // acquire that many iterations from globalN at once
+	cache   uint64  // local cache of acquired iterations
+	bN      uint64  // total number of iterations to execute (b.N)
+}\n 
+// Next reports whether there are more iterations to execute.
+func (pb *PB) Next() bool {
+	if pb.cache == 0 {
+		n := atomic.AddUint64(pb.globalN, pb.grain)
+		if n <= pb.bN {
+			pb.cache = pb.grain
+		} else if n < pb.bN+pb.grain {
+			pb.cache = pb.bN + pb.grain - n
+		} else {
+			return false
+		}
+	}
+	pb.cache--
+	return true
+}\n 
+// RunParallel runs a benchmark in parallel.
+// It creates multiple goroutines and distributes b.N iterations among them.
+// The number of goroutines defaults to GOMAXPROCS. To increase parallelism for
+// non-CPU-bound benchmarks, call SetParallelism before RunParallel.
+// RunParallel is usually used with the go test -cpu flag.
+//
+// The body function will be run in each goroutine. It should set up any
+// goroutine-local state and then iterate until pb.Next returns false.
+// It should not use the StartTimer, StopTimer, or ResetTimer functions,
+// because they have global effect.
+func (b *B) RunParallel(body func(*PB)) {
+	// Calculate grain size as number of iterations that take ~100µs.
+	// 100µs is enough to amortize the overhead and provide sufficient
+	// dynamic load balancing.
+	grain := uint64(0)
+	if b.previousN > 0 && b.previousDuration > 0 {
+		grain = 1e5 * uint64(b.previousN) / uint64(b.previousDuration)
+	}
+	if grain < 1 {
+		grain = 1
+	}
+	// We expect the inner loop and function call to take at least 10ns,
+	// so do not do more than 100µs/10ns=1e4 iterations.
+	if grain > 1e4 {
+		grain = 1e4
+	}
+
+	n := uint64(0)
+	numProcs := b.parallelism * runtime.GOMAXPROCS(0)
+	var wg sync.WaitGroup
+	wg.Add(numProcs)
+	for p := 0; p < numProcs; p++ {
+		go func() {
+			defer wg.Done()
+			pb := &PB{
+				globalN: &n,
+				grain:   grain,
+				bN:      uint64(b.N),
+			}
+			body(pb)
+		}()
+	}
+	wg.Wait()
+}\n 
+// SetParallelism sets the number of goroutines used by RunParallel to p*GOMAXPROCS.
+// There is usually no need to call SetParallelism for CPU-bound benchmarks.
+// If p is less than 1, this call will have no effect.
+func (b *B) SetParallelism(p int) {
+	if p >= 1 {
+		b.parallelism = p
+	}
+}\n 
 // Benchmark benchmarks a single function. Useful for creating
 // custom benchmarks that do not use the "go test" command.
 func Benchmark(f func(b *B)) BenchmarkResult {
diff --git a/src/pkg/testing/benchmark_test.go b/src/pkg/testing/benchmark_test.go
index 94e994dfae..9997b99204 100644
--- a/src/pkg/testing/benchmark_test.go
+++ b/src/pkg/testing/benchmark_test.go
@@ -5,7 +5,11 @@
 package testing_test
 
 import (
+	"bytes"
+	"runtime"
+	"sync/atomic"
 	"testing"
+	"text/template"
 )
 
 var roundDownTests = []struct {
@@ -56,3 +60,53 @@ func TestRoundUp(t *testing.T) {
 		}
 	}
 }\n 
+func TestRunParallel(t *testing.T) {
+	testing.Benchmark(func(b *testing.B) {
+		procs := uint32(0)
+		iters := uint64(0)
+		b.SetParallelism(3)
+		b.RunParallel(func(pb *testing.PB) {
+			atomic.AddUint32(&procs, 1)
+			for pb.Next() {
+				atomic.AddUint64(&iters, 1)
+			}
+		})
+		if want := uint32(3 * runtime.GOMAXPROCS(0)); procs != want {
+			t.Errorf("got %v procs, want %v", procs, want)
+		}
+		if iters != uint64(b.N) {
+			t.Errorf("got %v iters, want %v", iters, b.N)
+		}
+	})
+}\n 
+func TestRunParallelFail(t *testing.T) {
+	testing.Benchmark(func(b *testing.B) {
+		b.RunParallel(func(pb *testing.PB) {
+			// The function must be able to log/abort
+			// w/o crashing/deadlocking the whole benchmark.
+			b.Log("log")
+			b.Error("error")
+			b.Fatal("fatal")
+		})
+	})
+}\n 
+func ExampleB_RunParallel() {
+	// Parallel benchmark for text/template.Template.Execute on a single object.
+	testing.Benchmark(func(b *testing.B) {
+		templ := template.Must(template.New("test").Parse("Hello, {{.}}!"))
+		// RunParallel will create GOMAXPROCS goroutines
+		// and distribute work among them.
+		b.RunParallel(func(pb *testing.PB) {
+			// Each goroutine has its own bytes.Buffer.
+			var buf bytes.Buffer
+			for pb.Next() {
+				// The loop body is executed b.N times total across all goroutines.
+				buf.Reset()
+				templ.Execute(&buf, "World")
+			}
+		})
+	})
+}\ndiff --git a/src/pkg/testing/testing.go b/src/pkg/testing/testing.go
index 826d8e0120..855f3a9bbe 100644
--- a/src/pkg/testing/testing.go
+++ b/src/pkg/testing/testing.go
@@ -43,6 +43,7 @@
 //
 // If a benchmark needs some expensive setup before running, the timer
 // may be reset:\n+//\n //     func BenchmarkBigLen(b *testing.B) {
 //         big := NewBig()\n //         b.ResetTimer()\n@@ -51,6 +52,21 @@
 //         }\n //     }\n //\n+// If a benchmark needs to test performance in a parallel setting, it may use
+// the RunParallel helper function; such benchmarks are intended to be used with
+// the go test -cpu flag:\n+//\n+//     func BenchmarkTemplateParallel(b *testing.B) {
+//         templ := template.Must(template.New("test").Parse("Hello, {{.}}!"))
+//         b.RunParallel(func(pb *testing.PB) {
+//             var buf bytes.Buffer
+//             for pb.Next() {
+//                 buf.Reset()
+//                 templ.Execute(&buf, "World")
+//             }\n //         })\n //     }\n //\n // Examples
 //\n // The package also runs and verifies example code. Example functions may\n```

## GitHub上でのコミットページへのリンク

[https://github.com/golang/go/commit/c3922f0a63b826000e6ab46cadc344782fbae178](https://github.com/golang/go/commit/c3922f0a63b826000e6ab46cadc344782fbae178)

## 元コミット内容

`testing: ease writing parallel benchmarks`
`b.RunParallel`関数を追加し、並行ベンチマークの定型的な処理(ワーカーゴルーチンの作成、結合、効率的な作業の分散、粒度の自動調整)をカプセル化する。
Issue #7090 を修正。

## 変更の背景

Go言語のベンチマーク機能は、単一のゴルーチンで実行されるシーケンシャルな処理のパフォーマンス測定には優れていました。しかし、複数のゴルーチンが並行して動作するような、並行処理のパフォーマンスを正確に測定するための標準的なメカニズムが不足していました。

従来の並行ベンチマークを記述するには、ユーザーが手動で以下の処理を行う必要がありました。
- 複数のゴルーチンを起動し、それぞれにベンチマークの作業を割り当てる。
- 各ゴルーチンが完了するのを待つために`sync.WaitGroup`などの同期プリミティブを使用する。
- `b.N`(ベンチマークのイテレーション数)を複数のゴルーチン間で効率的に分散させるロジックを実装する。
- ベンチマークの実行中に、各ゴルーチンが適切な量の作業を取得し、ロードバランスが適切に行われるように、作業の「粒度(grain size)」を調整する。

これらの処理は複雑で、エラーが発生しやすく、ベンチマークコードの可読性と保守性を低下させていました。特に、粒度の調整はベンチマーク結果の正確性に大きく影響するため、非常に重要でありながらも実装が困難な部分でした。

このコミットは、これらの課題を解決し、開発者がより簡単に、かつ正確に並行ベンチマークを記述できるようにするために`b.RunParallel`関数を導入しました。これにより、並行処理のパフォーマンス特性をより深く理解し、最適化を進めることが可能になります。

なお、コミットメッセージに記載されている`Fixes #7090`について、Web検索ではこのコミットに直接関連するGoのIssue #7090は見つかりませんでした。検索結果は別のセキュリティ脆弱性に関するものであり、このコミットの文脈とは異なります。

## 前提知識の解説

### Go言語の`testing`パッケージ
Go言語には、テストとベンチマークのための組み込みパッケージ`testing`があります。
- **テスト**: `func TestXxx(t *testing.T)`という形式の関数で記述され、コードの正確性を検証します。
- **ベンチマーク**: `func BenchmarkXxx(b *testing.B)`という形式の関数で記述され、コードのパフォーマンスを測定します。`b.N`というフィールドがあり、ベンチマーク対象のコードが実行されるイテレーション数を表します。ベンチマーク実行時には、`b.N`の値が自動的に調整され、十分な時間(デフォルトでは1秒)実行されるようにします。

### `b.N`
`testing.B`構造体のフィールドで、ベンチマーク対象のコードが実行される回数を示します。ベンチマークランナーは、この`b.N`の値を動的に調整し、統計的に有意な結果が得られるようにします。

### `GOMAXPROCS`
Goランタイムが同時に実行できるOSスレッドの最大数を制御する環境変数、または`runtime.GOMAXPROCS`関数で設定できる値です。デフォルトでは、利用可能なCPUコア数に設定されます。並行ベンチマークでは、この値がワーカーゴルーチンの数に影響を与えることがあります。

### ゴルーチン (Goroutines)
Go言語の軽量な並行処理単位です。OSスレッドよりもはるかに軽量で、数百万個のゴルーチンを同時に実行することも可能です。`go`キーワードを使って関数を呼び出すことで起動します。

### チャネル (Channels)
ゴルーチン間で値を送受信するための通信メカニズムです。チャネルは、ゴルーチン間の同期と通信を安全に行うための主要な手段です。

### `sync.WaitGroup`
複数のゴルーチンが完了するのを待つために使用される同期プリミティブです。`Add`で待つゴルーチンの数を増やし、各ゴルーチンが完了時に`Done`を呼び出し、`Wait`で全てのゴルーチンが完了するのを待ちます。

### アトミック操作 (Atomic Operations)
複数のゴルーチンから同時にアクセスされる共有変数に対して、競合状態(race condition)を避けるために使用される不可分な操作です。`sync/atomic`パッケージで提供されます。

## 技術的詳細

`b.RunParallel`は、並行ベンチマークの実行を大幅に簡素化します。その内部動作は以下の要素で構成されています。

1.  **ワーカーゴルーチンの生成**: `b.RunParallel`は、`b.parallelism * runtime.GOMAXPROCS(0)`の数だけワーカーゴルーチンを生成します。`b.parallelism`はデフォルトで1ですが、`b.SetParallelism`で調整可能です。これにより、CPUコア数に応じた適切な並行度でベンチマークが実行されます。

2.  **作業の分散と粒度 (Grain Size) の自動調整**:
    - `b.N`で指定された総イテレーション数を、生成されたワーカーゴルーチン間で効率的に分散させます。
    - 各ワーカーゴルーチンは、`PB`(Parallel Benchmark)構造体を受け取ります。この`PB`構造体には、共有のイテレーションカウンター`globalN`、一度に取得するイテレーション数を示す`grain`、およびローカルキャッシュ`cache`が含まれます。
    - `grain`のサイズは、前回のベンチマーク実行時間とイテレーション数に基づいて自動的に計算されます。目標は、約100マイクロ秒かかるイテレーション数を`grain`とすることです。これは、オーバーヘッドを償却し、十分な動的ロードバランシングを提供するためです。`grain`は最低1、最大10000に制限されます。
    - 各ワーカーゴルーチンは、`pb.Next()`メソッドを呼び出すことで次のイテレーションがあるかを確認し、あれば実行します。`pb.Next()`は、`globalN`から`grain`数だけイテレーションをアトミックに取得し、ローカルキャッシュ`cache`に格納します。ローカルキャッシュが尽きると、再度`globalN`から取得します。これにより、競合を最小限に抑えつつ、効率的に作業を分散します。

3.  **ゴルーチンの結合**: `sync.WaitGroup`を使用して、全てのワーカーゴルーチンが`b.N`のイテレーションを完了するまで`b.RunParallel`がブロックします。

4.  **ベンチマークの実行ループ**: `b.RunParallel`に渡される`body`関数は、各ワーカーゴルーチンで実行されます。この`body`関数内で、`for pb.Next() { ... }`というループを記述することで、ベンチマーク対象のコードを`b.N`回実行します。`pb.Next()`は、まだ実行すべきイテレーションが残っている場合に`true`を返します。

`b.RunParallel`を使用するベンチマークでは、`b.StartTimer`、`b.StopTimer`、`b.ResetTimer`といったグローバルな効果を持つ関数は使用すべきではありません。これらの関数は`b.RunParallel`の内部で適切に管理されます。

## コアとなるコードの変更箇所

このコミットでは、主に以下のファイルが変更されています。

1.  **`doc/go1.3.txt`**: Go 1.3のリリースノートに`testing: add b.RunParallel function`が追加されました。
2.  **`src/pkg/runtime/chan_test.go`**: 既存のチャネル関連のベンチマーク(`BenchmarkSelectUncontended`, `BenchmarkSelectContended`, `BenchmarkSelectNonblock`, `BenchmarkChanUncontended`, `BenchmarkChanContended`, `BenchmarkChanCreation`)が、手動でゴルーチンを管理していたコードから`b.RunParallel`を使用するように書き換えられました。これにより、コードが大幅に簡素化されています。
3.  **`src/pkg/testing/benchmark.go`**:
    - `B`構造体に`previousN`, `previousDuration`, `parallelism`といった新しいフィールドが追加されました。これらは`b.RunParallel`の内部で粒度計算や並行度設定に使用されます。
    - `PB`構造体が新しく定義されました。これは`b.RunParallel`に渡されるワーカーゴルーチンごとのコンテキストを提供します。
    - `PB.Next()`メソッドが追加されました。これは、ワーカーゴルーチンが次のイテレーションを実行すべきかどうかを判断し、作業をアトミックに取得します。
    - `B.RunParallel(body func(*PB))`関数が追加されました。これが並行ベンチマークの主要なエントリポイントです。
    - `B.SetParallelism(p int)`関数が追加されました。これは`b.RunParallel`が生成するゴルーチンの数を調整するために使用されます。
4.  **`src/pkg/testing/benchmark_test.go`**: `TestRunParallel`と`ExampleB_RunParallel`が追加され、`b.RunParallel`の機能と使用例がテストおよびドキュメント化されました。
5.  **`src/pkg/testing/testing.go`**: `b.RunParallel`の使用例がコメントとして追加され、ドキュメントが更新されました。

## コアとなるコードの解説

### `src/pkg/testing/benchmark.go`

#### `type B struct { ... }` の変更点

```go
type B struct {
	common
	N                int
	previousN        int           // number of iterations in the previous run
	previousDuration time.Duration // total duration of the previous run
	benchmark        InternalBenchmark
	bytes            int64
	timerOn          bool
	showAllocResult  bool
	result           BenchmarkResult
	parallelism      int // RunParallel creates parallelism*GOMAXPROCS goroutines
	// The initial states of memStats.Mallocs and memStats.TotalAlloc.
	startAllocs uint64
	startBytes  uint64
}
  • previousNpreviousDuration: 前回のベンチマーク実行のイテレーション数と総実行時間を記録します。これらはRunParallel内で粒度(grain size)を計算するために使用されます。
  • parallelism: RunParallelが作成するゴルーチンの数をparallelism * GOMAXPROCSとして決定します。デフォルトは1です。

func (b *B) runN(n int) の変更点

func (b *B) runN(n int) {
	// ...
	runtime.GC()
	b.N = n
	b.parallelism = 1 // RunParallelのデフォルト並行度を1に設定
	b.ResetTimer()
	b.StartTimer()
	b.benchmark.F(b)
	b.StopTimer()
	b.previousN = n // 前回の実行情報を保存
	b.previousDuration = b.duration // 前回の実行情報を保存
}

runNはベンチマークの内部実行関数で、b.Nの値を設定し、タイマーをリセット・開始・停止します。この変更により、b.RunParallelが粒度を自動調整するために必要なpreviousNpreviousDurationが記録されるようになりました。

type PB struct { ... } の追加

type PB struct {
	globalN *uint64 // shared between all worker goroutines iteration counter
	grain   uint64  // acquire that many iterations from globalN at once
	cache   uint64  // local cache of acquired iterations
	bN      uint64  // total number of iterations to execute (b.N)
}
  • globalN: 全てのワーカーゴルーチンで共有される、総イテレーションカウンターへのポインタです。atomic操作で安全にアクセスされます。
  • grain: 各ワーカーゴルーチンがglobalNから一度に取得するイテレーション数です。
  • cache: grainで取得したイテレーションを一時的に保持するローカルキャッシュです。
  • bN: b.Nの総イテレーション数です。

func (pb *PB) Next() bool の追加

func (pb *PB) Next() bool {
	if pb.cache == 0 { // ローカルキャッシュが空の場合
		n := atomic.AddUint64(pb.globalN, pb.grain) // 共有カウンターからgrain数だけイテレーションを取得
		if n <= pb.bN { // 総イテレーション数を超えていない場合
			pb.cache = pb.grain
		} else if n < pb.bN+pb.grain { // 総イテレーション数を少し超えた場合(最後の塊)
			pb.cache = pb.bN + pb.grain - n
		} else { // 全てのイテレーションが完了した場合
			return false
		}
	}
	pb.cache-- // ローカルキャッシュから1つ消費
	return true
}

このメソッドは、並行ベンチマークの各ワーカーゴルーチンがループを継続すべきかどうかを判断します。

  • pb.cacheが0の場合、globalNからgrain数だけイテレーションをアトミックに取得し、pb.cacheに補充します。
  • globalNb.Nを超えた場合、残りのイテレーションを処理し、それ以上作業がない場合はfalseを返します。
  • pb.cacheから1つ消費し、trueを返します。

func (b *B) RunParallel(body func(*PB)) の追加

func (b *B) RunParallel(body func(*PB)) {
	// Calculate grain size as number of iterations that take ~100µs.
	// ...
	grain := uint64(0)
	if b.previousN > 0 && b.previousDuration > 0 {
		grain = 1e5 * uint64(b.previousN) / uint64(b.previousDuration) // 100µsあたりのイテレーション数を計算
	}
	if grain < 1 {
		grain = 1
	}
	if grain > 1e4 { // 最大10000に制限
		grain = 1e4
	}

	n := uint64(0) // 共有イテレーションカウンター
	numProcs := b.parallelism * runtime.GOMAXPROCS(0) // 起動するゴルーチン数
	var wg sync.WaitGroup
	wg.Add(numProcs)
	for p := 0; p < numProcs; p++ {
		go func() {
			defer wg.Done()
			pb := &PB{ // 各ゴルーチンにPBインスタンスを渡す
				globalN: &n,
				grain:   grain,
				bN:      uint64(b.N),
			}
			body(pb) // ユーザー定義のベンチマークロジックを実行
		}()
	}
	wg.Wait() // 全てのゴルーチンの完了を待つ
}
  • 粒度計算: b.previousNb.previousDurationを使用して、約100マイクロ秒で実行されるイテレーション数(grain)を計算します。これにより、動的なロードバランシングとオーバーヘッドの償却が最適化されます。
  • ゴルーチン起動: numProcsで計算された数のゴルーチンを起動します。
  • PBインスタンスの生成: 各ゴルーチンに、共有カウンターn、計算されたgrain、および総イテレーション数b.Nを含むPBインスタンスを渡します。
  • body関数の実行: 各ゴルーチン内で、ユーザーが定義したbody関数がpbを引数として実行されます。このbody関数内でfor pb.Next() { ... }ループを使用してベンチマーク対象のコードを実行します。
  • 待機: sync.WaitGroupを使用して、全てのワーカーゴルーチンが完了するまで待機します。

func (b *B) SetParallelism(p int) の追加

func (b *B) SetParallelism(p int) {
	if p >= 1 {
		b.parallelism = p
	}
}

RunParallelが起動するゴルーチンの数をp * GOMAXPROCSに設定します。CPUバウンドではないベンチマークで並行度を上げるために使用できます。

src/pkg/runtime/chan_test.go の変更点

このファイルでは、既存のチャネル関連のベンチマーク関数が、手動でゴルーチンを管理していた複雑なコードから、新しく追加されたb.RunParallelを使用するように書き換えられました。これにより、コードの行数が大幅に削減され、可読性が向上しています。

変更前(例: BenchmarkSelectUncontended: 手動でprocs数のゴルーチンを起動し、atomic.AddInt32で共有カウンターを減らし、chan boolでゴルーチンの完了を待っていました。

変更後(例: BenchmarkSelectUncontended:

func BenchmarkSelectUncontended(b *testing.B) {
	b.RunParallel(func(pb *testing.PB) {
		myc1 := make(chan int, 1)
		myc2 := make(chan int, 1)
		myc1 <- 0
		for pb.Next() { // b.RunParallelが提供するループ
			select {
			case <-myc1:
				myc2 <- 0
			case <-myc2:
				myc1 <- 0
			}
		}
	})
}

b.RunParallelに渡すbody関数内でfor pb.Next() { ... }ループを使用するだけで、並行処理の管理が自動的に行われるようになりました。これにより、ベンチマークコードがより簡潔で理解しやすくなっています。

関連リンク

参考にした情報源リンク