Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

[インデックス 13459] ファイルの概要

このコミットは、Go言語の net/http パッケージにおける Transport の競合状態(レースコンディション)を修正し、特に GOMAXPROCS が高い値に設定されている環境下での安定性を向上させるものです。具体的には、アイドル接続プールの管理、永続的なHTTP接続(persistConn)のライフサイクル、およびレスポンスボディの読み取りに関する複数の競合状態に対処しています。これにより、高負荷時や並行処理が多い状況でのHTTPクライアントの信頼性が向上しました。

コミット

  • コミットハッシュ: 8a2a5013c962e4900eed0a3a4a471df293db3f40
  • 作者: Brad Fitzpatrick bradfitz@golang.org
  • コミット日時: 2012年7月11日 水曜日 16:40:44 -0700

GitHub上でのコミットページへのリンク

https://github.com/golang/go/commit/8a2a5013c962e4900eed0a3a4a471df293db3f40

元コミット内容

net/http: fix Transport race(s) with high GOMAXPROCS

Also adds a new test for GOMAXPROCS=16 explicitly, which now passes
reliably in a stress loop like:

$ go test -c
$ (while ./http.test -test.v -test.run=Concurrency; do echo pass; done ) 2>&1 | tee foo; less foo

(It used to fail very quickly and reliably on at least Linux/amd664)

Fixes #3793

R=golang-dev, adg, r
CC=golang-dev
https://golang.org/cl/6347061

変更の背景

このコミットの主な背景は、Goの net/http パッケージの Transport 実装における競合状態の存在です。特に、GOMAXPROCS 環境変数(Goランタイムが同時に実行できるOSスレッドの最大数を制御する)が高い値に設定されている場合に、これらの競合状態が顕在化しやすかったことが示唆されています。

net/http.Transport は、HTTPリクエストの送信、レスポンスの受信、接続の再利用(キープアライブ)などを管理するGoのHTTPクライアントの基盤となるコンポーネントです。効率的な接続再利用のために、アイドル状態の接続をプールするメカニズムを持っています。しかし、複数のゴルーチンが同時にこの接続プールにアクセスしたり、接続のライフサイクルを管理したりする際に、同期の問題が発生し、競合状態を引き起こす可能性がありました。

元のコミットメッセージによると、この問題は特に GOMAXPROCS=16 のような高い並行度設定で顕著であり、特定のテスト(TestTransportConcurrency)がLinux/amd64環境で頻繁に失敗していました。これは、複数のCPUコアが利用可能になり、ゴルーチンの実行がより並行的に行われることで、これまで表面化しなかったタイミングの問題が露呈したことを意味します。

この競合状態は、HTTPクライアントの信頼性とパフォーマンスに悪影響を及ぼす可能性があり、特に高負荷なWebサービスやマイクロサービス間通信において、接続のリーク、デッドロック、または予期せぬエラーの原因となるため、修正が急務でした。

前提知識の解説

1. net/http.Transport

net/http.Transport は、Goの標準ライブラリ net/http パッケージの一部であり、HTTPクライアントがネットワークリクエストを送信し、レスポンスを受信する際の低レベルな詳細を処理します。主な機能は以下の通りです。

  • 接続の確立と管理: TCP接続の確立、TLSハンドシェイク(HTTPSの場合)を行います。
  • 接続の再利用(キープアライブ): HTTP/1.1のキープアライブ機能を利用して、一度確立した接続を再利用し、新しいリクエストごとに接続を再確立するオーバーヘッドを削減します。これにより、パフォーマンスが向上します。アイドル状態の接続は内部の接続プールに保持されます。
  • プロキシのサポート: HTTPプロキシやSOCKSプロキシを介したリクエスト送信をサポートします。
  • リダイレクトの処理: リダイレクトを自動的に追跡します。
  • タイムアウト: 接続、リクエスト、レスポンスの各段階でのタイムアウトを設定できます。

http.Client は、この Transport を利用して高レベルなHTTPリクエストを構築・送信します。

2. GOMAXPROCS

GOMAXPROCS はGoランタイムが同時に実行できるOSスレッドの最大数を制御する環境変数です。

  • Go 1.5以前: デフォルト値は 1 でした。つまり、Goプログラムはデフォルトで1つのOSスレッドしか使用せず、複数のゴルーチンがあっても、それらはこの1つのスレッド上で多重化されて実行されていました。
  • Go 1.5以降: デフォルト値は利用可能な論理CPUの数になりました。これにより、Goプログラムはデフォルトで複数のCPUコアを最大限に活用し、ゴルーチンを真に並行して実行できるようになりました。

GOMAXPROCS の値が高いほど、複数のゴルーチンが同時に異なるCPUコアで実行される可能性が高まります。これはパフォーマンス向上に寄与しますが、同時に競合状態(レースコンディション)を顕在化させやすくもなります。なぜなら、複数のゴルーチンが共有リソースに同時にアクセスする機会が増え、予期せぬ順序で操作が行われることで、データの不整合やプログラムのクラッシュを引き起こす可能性があるためです。

3. ゴルーチン (Goroutines) と競合状態 (Race Conditions)

  • ゴルーチン: Goにおける軽量な並行実行単位です。OSスレッドよりもはるかに軽量で、数千、数万のゴルーチンを同時に実行することが可能です。ゴルーチンはGoランタイムによってスケジューリングされ、OSスレッド上で実行されます。
  • 競合状態 (Race Condition): 複数のゴルーチン(またはスレッド)が共有リソース(変数、データ構造など)に同時にアクセスし、少なくとも1つのアクセスが書き込みである場合に、それらのアクセスの最終的な結果が実行のタイミングに依存してしまう状態を指します。結果として、プログラムの動作が非決定論的になり、バグの特定と修正が困難になります。

このコミットは、GOMAXPROCS が高い場合に顕在化する net/http.Transport 内の競合状態を修正することを目的としています。これは、複数のゴルーチンが同時にHTTP接続プールや接続の内部状態を操作しようとした際に発生する問題です。

技術的詳細

このコミットは、net/http.Transport における複数の競合状態を、主に以下のメカニズムで修正しています。

1. アイドル接続プールの重複追加防止

Transport.putIdleConn メソッドは、使用済みでアイドル状態になった接続を接続プールに戻す役割を担います。このメソッドに以下のチェックが追加されました。

	for _, exist := range t.idleConn[key] {
		if exist == pconn {
			log.Fatalf("dup idle pconn %p in freelist", pconn)
		}
	}

このコードは、t.idleConn[key] (特定の接続キーに対応するアイドル接続のリスト) 内に、現在追加しようとしている pconn (永続接続) が既に存在しないかを確認します。もし重複が見つかった場合、log.Fatalf を呼び出してプログラムを終了させます。これは、同じ persistConn インスタンスが複数回アイドル接続プールに追加されるという競合状態が存在したことを示唆しています。このような重複は、接続プールの状態を破壊し、後続のリクエストで誤った接続が使用されたり、接続がリークしたりする原因となります。このチェックは、開発段階でこのような競合状態を早期に発見し、デバッグを容易にするためのアサーションとして機能します。

2. getIdleConn の到達不能パスの修正

Transport.getIdleConn メソッドは、アイドル接続プールから利用可能な接続を取得します。このメソッドの末尾が以下のように変更されました。

--- a/src/pkg/net/http/transport.go
+++ b/src/pkg/net/http/transport.go
@@ -289,7 +295,7 @@ func (t *Transport) getIdleConn(cm *connectMethod) (pconn *persistConn) {
 			return
 		}
 	}
-	return
+	panic("unreachable")
 }

以前は、アイドル接続が見つからなかった場合に単に return していました。しかし、この変更により panic("unreachable") が追加されました。これは、getIdleConn が呼び出された時点で、何らかの理由でアイドル接続が取得できない状況は、本来発生してはならない「到達不能」な状態であるという設計意図を明確にしています。この変更自体が直接的な競合状態の修正というよりは、プログラムのロジックフローを厳密にし、予期せぬ状態を早期に検出するためのものです。

3. persistConn のクローズシグナル (closech) の導入

persistConn は、単一の永続的なHTTP接続を表す構造体です。この構造体に closech chan struct{} が追加されました。

type persistConn struct {
	// ...
	closech  chan struct{}       // broadcast close when readLoop (TCP connection) closes
}

このチャネルは、persistConn に関連付けられたTCP接続がクローズされたときに、他のゴルーチンにその事実を通知するために使用されます。

persistConn.readLoop メソッド(接続からの読み取りを処理するゴルーチン)の defer ステートメントに defer close(pc.closech) が追加されました。

func (pc *persistConn) readLoop() {
	defer close(pc.closech) // Ensures closech is closed when readLoop exits
	defer close(pc.writech)
	// ...
}

これにより、readLoop が終了する(つまり、基盤となるTCP接続がクローズされる)と同時に pc.closech がクローズされ、このチャネルをリッスンしている他のゴルーチンに接続が終了したことが通知されます。

4. roundTrip における接続クローズのハンドリング

persistConn.roundTrip メソッドは、単一のHTTPリクエストを永続接続経由で送信し、レスポンスを受信する役割を担います。このメソッドに pc.closech を監視する select ケースが追加されました。

func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
	// ...
	var pconnDeadCh = pc.closech
	var failTicker <-chan time.Time
WaitResponse:
	for {
		select {
		// ... existing cases ...
		case <-pconnDeadCh:
			// The persist connection is dead. This shouldn't
			// usually happen (only with Connection: close responses
			// with no response bodies), but if it does happen it
			// means either a) the remote server hung up on us
			// prematurely, or b) the readLoop sent us a response &
			// closed its closech at roughly the same time, and we
			// selected this case first, in which case a response
			// might still be coming soon.
			//
			// We can't avoid the select race in b) by using a unbuffered
			// resc channel instead, because then goroutines can
			// leak if we exit due to other errors.
			pconnDeadCh = nil                               // avoid spinning
			failTicker = time.After(100 * time.Millisecond) // arbitrary time to wait for resc
		case <-failTicker:
			re = responseAndError{nil, errors.New("net/http: transport closed before response was received")}
			break WaitResponse
		case re = <-resc:
			break WaitResponse
		}
	}
	// ...
}

この変更により、roundTrip はレスポンスを待っている間に pc.closech がクローズされたことを検出できるようになりました。これは、リモートサーバーが予期せず接続を切断した場合や、readLoop がレスポンスを送信するとほぼ同時に接続をクローズした場合に発生します。

  • <-pconnDeadCh が選択された場合、接続がデッドになったことを認識します。
  • pconnDeadCh = nil とすることで、このケースが再度選択されるのを防ぎます(スピニングを避けるため)。
  • failTicker = time.After(100 * time.Millisecond) を設定し、短い時間(100ms)だけレスポンスチャネル (resc) からのレスポンスを待ちます。これは、readLoop がレスポンスを送信し、ほぼ同時に closech をクローズするような競合状態に対応するためです。もしこの短い時間内にレスポンスが来なければ、接続がクローズされたことによるエラーを返します。

このメカニズムは、接続が予期せずクローズされた場合に roundTrip が無限にブロックされるのを防ぎ、適切なエラーを返すことで、クライアント側のロジックがより堅牢になるようにします。

5. bodyEOFSignalsync.Once によるコールバックの単一実行保証

bodyEOFSignal は、HTTPレスポンスボディの読み取りが完了したとき(EOFに達したとき)や、ボディがクローズされたときに特定のコールバック関数 (fn) を実行するためのヘルパー構造体です。以前の実装では、Read メソッドがEOFを返したときと、Close メソッドが呼び出されたときの両方で fn() が直接呼び出される可能性がありました。これにより、fn が複数回実行される競合状態が発生する可能性がありました。

このコミットでは、bodyEOFSignalsync.Once フィールドが追加され、condfn() という新しいヘルパーメソッドが導入されました。

type bodyEOFSignal struct {
	// ...
	once     sync.Once
}

func (es *bodyEOFSignal) condfn() {
	if es.fn != nil {
		es.once.Do(es.fn)
	}
}

そして、ReadClose メソッド内で es.fn() の直接呼び出しが es.condfn() に置き換えられました。

--- a/src/pkg/net/http/transport.go
+++ b/src/pkg/net/http/transport.go
@@ -769,9 +803,8 @@ func (es *bodyEOFSignal) Read(p []byte) (n int, err error) {
 	if es.isClosed && n > 0 {
 		panic("http: unexpected bodyEOFSignal Read after Close; see issue 1725")
 	}
-	if err == io.EOF && es.fn != nil {
-		es.fn()
-		es.fn = nil
+	if err == io.EOF {
+		es.condfn()
 	}
 	return
 }
@@ -782,13 +815,18 @@ func (es *bodyEOFSignal) Close() (err error) {
 	}
 	es.isClosed = true
 	err = es.body.Close()
-	if err == nil && es.fn != nil {
-		es.fn()
-		es.fn = nil
+	if err == nil {
+		es.condfn()
 	}
 	return
 }

sync.Once.Do(f) は、引数 f で指定された関数を、Do が呼び出された回数に関わらず、一度だけ実行することを保証します。これにより、es.fnRead または Close のどちらから呼び出されても、必ず1回だけ実行されるようになり、競合状態が解消されます。

6. waitForBodyRead チャネルのバッファリング

persistConn.readLoop 内で、レスポンスボディの読み取り完了を待つためのチャネル waitForBodyRead が、バッファなしチャネルからバッファ付きチャネル(容量1)に変更されました。

--- a/src/pkg/net/http/transport.go
+++ b/src/pkg/net/http/transport.go
@@ -578,7 +591,7 @@ func (pc *persistConn) readLoop() {
 		var waitForBodyRead chan bool
 		if hasBody {
 			lastbody = resp.Body
-			waitForBodyRead = make(chan bool)
+			waitForBodyRead = make(chan bool, 1)
 			resp.Body.(*bodyEOFSignal).fn = func() {
 				if alive && !pc.t.putIdleConn(pc) {
 					alive = false

バッファなしチャネルは、送信側と受信側が同時に準備できていないとブロックします。この場合、bodyEOFSignal.fn が呼び出された際に waitForBodyRead に送信しようとしても、受信側がまだ準備できていないとデッドロックやブロックが発生する可能性がありました。容量1のバッファ付きチャネルにすることで、送信側がブロックされることなく値を送信できるようになり、より堅牢な同期メカニズムが提供されます。

これらの変更は、Goの並行処理モデルにおけるチャネルとゴルーチンの利用をより安全かつ効率的にするための典型的なパターンを適用しています。特に sync.Once の使用は、コールバックの単一実行を保証する上で非常に重要です。

コアとなるコードの変更箇所

このコミットでは、主に以下の2つのファイルが変更されています。

  1. src/pkg/net/http/transport.go: net/http パッケージの Transport 実装のコアロジックが含まれるファイル。競合状態の修正と、接続管理の堅牢化に関する変更が集中しています。
    • 挿入: 56行
    • 削除: 6行
  2. src/pkg/net/http/transport_test.go: net/http パッケージのテストファイル。特に、高い GOMAXPROCS 環境下での並行処理の安定性を検証するための新しいテストが追加されています。
    • 挿入: 44行
    • 削除: 0行

コアとなるコードの解説

src/pkg/net/http/transport.go の変更点

  1. Transport.putIdleConn メソッドの変更:

    • アイドル接続プールに接続を追加する際に、同じ persistConn インスタンスが既にプールに存在しないかを確認するループが追加されました。
    • もし重複が見つかった場合、log.Fatalf を呼び出してプログラムを終了させます。これは、開発中に競合状態による重複追加を早期に検出するためのアサーションです。
    func (t *Transport) putIdleConn(pconn *persistConn) bool {
        // ... (既存のコード)
        for _, exist := range t.idleConn[key] {
            if exist == pconn {
                log.Fatalf("dup idle pconn %p in freelist", pconn) // 重複検出時のエラー
            }
        }
        t.idleConn[key] = append(t.idleConn[key], pconn)
        return true
    }
    
  2. Transport.getIdleConn メソッドの変更:

    • アイドル接続が見つからなかった場合の returnpanic("unreachable") に変更されました。これは、このパスが論理的に到達してはならない状態であることを示し、プログラムの堅牢性を高めます。
    func (t *Transport) getIdleConn(cm *connectMethod) (pconn *persistConn) {
        // ... (既存のコード)
        panic("unreachable") // アイドル接続が見つからない場合はパニック
    }
    
  3. persistConn 構造体の変更:

    • closech chan struct{} フィールドが追加されました。これは、persistConn に関連付けられたTCP接続がクローズされたことを他のゴルーチンに通知するためのチャネルです。
    type persistConn struct {
        // ...
        closech  chan struct{}       // broadcast close when readLoop (TCP connection) closes
    }
    
  4. persistConn.readLoop メソッドの変更:

    • defer close(pc.closech) が追加され、readLoop ゴルーチンが終了する際に closech が確実にクローズされるようになりました。これにより、接続の終了が他のゴルーチンに正確に通知されます。
    • ReadResponse の呼び出しが if err == nil のガードで囲まれ、エラーが発生した場合に不必要な処理を避けるようになりました。
    • waitForBodyRead チャネルがバッファなし (make(chan bool)) からバッファ付き (make(chan bool, 1)) に変更されました。これにより、ボディの読み取り完了シグナルが送信される際に、受信側がまだ準備できていなくてもブロックされなくなり、デッドロックのリスクが軽減されます。
    func (pc *persistConn) readLoop() {
        defer close(pc.closech) // readLoop終了時にclosechをクローズ
        defer close(pc.writech)
        // ...
        var resp *Response
        if err == nil { // エラーがない場合のみReadResponseを試行
            resp, err = ReadResponse(pc.br, rc.req)
        }
        // ...
        if hasBody {
            lastbody = resp.Body
            waitForBodyRead = make(chan bool, 1) // バッファ付きチャネルに変更
            // ...
        }
        // ...
    }
    
  5. persistConn.roundTrip メソッドの変更:

    • select ステートメントに <-pconnDeadCh<-failTicker のケースが追加されました。
    • pconnDeadChpc.closech を監視し、接続がデッドになったことを検出します。
    • 接続がデッドになった場合、failTicker を設定して短い時間(100ms)だけレスポンスを待ちます。これは、レスポンスがほぼ同時に到着する可能性のある競合状態に対応するためです。
    • この時間内にレスポンスが来なければ、エラーを返して roundTrip を終了します。これにより、デッド接続で無限に待機するのを防ぎます。
    func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
        // ...
        var pconnDeadCh = pc.closech
        var failTicker <-chan time.Time
    WaitResponse:
        for {
            select {
            // ... (既存のケース)
            case <-pconnDeadCh: // 接続がデッドになった場合
                pconnDeadCh = nil
                failTicker = time.After(100 * time.Millisecond) // 短時間レスポンスを待つ
            case <-failTicker: // 短時間待ってもレスポンスが来ない場合
                re = responseAndError{nil, errors.New("net/http: transport closed before response was received")}
                break WaitResponse
            case re = <-resc: // レスポンスが来た場合
                break WaitResponse
            }
        }
        // ...
    }
    
  6. bodyEOFSignal 構造体と関連メソッドの変更:

    • sync.Once フィールド once が追加されました。
    • condfn() という新しいヘルパーメソッドが追加され、es.fn コールバックを es.once.Do(es.fn) を使って一度だけ実行することを保証します。
    • ReadClose メソッド内で es.fn() の直接呼び出しが es.condfn() に置き換えられました。これにより、fn が複数回呼び出される競合状態が解消されます。
    type bodyEOFSignal struct {
        // ...
        once     sync.Once // コールバックの単一実行を保証
    }
    
    func (es *bodyEOFSignal) condfn() {
        if es.fn != nil {
            es.once.Do(es.fn) // fnを一度だけ実行
        }
    }
    
    func (es *bodyEOFSignal) Read(p []byte) (n int, err error) {
        // ...
        if err == io.EOF {
            es.condfn() // condfnを呼び出す
        }
        return
    }
    
    func (es *bodyEOFSignal) Close() (err error) {
        // ...
        if err == nil {
            es.condfn() // condfnを呼び出す
        }
        return
    }
    

src/pkg/net/http/transport_test.go の変更点

  1. TestTransportConcurrency テスト関数の追加:

    • この新しいテストは、GOMAXPROCS16 に設定し、多数の並行HTTPリクエストを送信することで、Transport の並行処理能力と安定性を厳密にテストします。
    • httptest.NewServer を使用してテスト用のHTTPサーバーを起動し、クライアントが送信した echo パラメータをそのまま返すシンプルなハンドラを設定します。
    • numReqs (500) のリクエストを生成し、maxProcs*2 (32) のゴルーチンで並行してこれらのリクエストを処理します。
    • 各リクエストのレスポンスボディが期待通りであることを検証し、エラーが発生した場合はテストを失敗させます。
    • このテストは、以前のバージョンで頻繁に失敗していた競合状態を再現し、今回の修正によってそれが解消されたことを確認するために設計されました。
    func TestTransportConcurrency(t *testing.T) {
        const maxProcs = 16
        const numReqs = 500
        defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(maxProcs)) // GOMAXPROCSを16に設定
        ts := httptest.NewServer(HandlerFunc(func(w ResponseWriter, r *Request) {
            fmt.Fprintf(w, "%v", r.FormValue("echo")) // echoパラメータを返す
        }))
        defer ts.Close()
        tr := &Transport{}
        c := &Client{Transport: tr}
        reqs := make(chan string)
        defer close(reqs)
    
        var wg sync.WaitGroup
        wg.Add(numReqs)
        for i := 0; i < maxProcs*2; i++ { // 多数のゴルーチンでリクエストを処理
            go func() {
                for req := range reqs {
                    res, err := c.Get(ts.URL + "/?echo=" + req)
                    // ... (エラーチェックとレスポンスボディの検証)
                    res.Body.Close()
                }
            }()
        }
        for i := 0; i < numReqs; i++ {
            reqs <- fmt.Sprintf("request-%d", i) // リクエストを送信
        }
        wg.Wait() // 全てのリクエストが完了するのを待つ
    }
    

これらの変更は、net/http.Transport の内部的な同期メカニズムを強化し、高並行環境下での信頼性と安定性を大幅に向上させるものです。特に sync.Once の導入やチャネルの適切な利用は、Goにおける並行処理のベストプラクティスを示しています。

関連リンク

  • Go CL (Code Review) リンク: https://golang.org/cl/6347061
  • Go Issue #3793: Fixes #3793 とコミットメッセージに記載されていますが、直接的なIssueページは検索結果からは見つかりませんでした。しかし、このコミットが解決しようとしている問題は、net/http.Transport における高 GOMAXPROCS 環境下での競合状態であることが明確です。

参考にした情報源リンク