[インデックス 13364] ファイルの概要
このコミットは、Go言語の標準ライブラリ net/http パッケージにおけるクライアントの挙動を改善するものです。具体的には、HTTPリクエストのボディを送信している最中にサーバーからレスポンスが返された場合(例えば、認証エラーなど)、以前はクライアントがそのレスポンスを適切に処理できず、書き込みエラー("broken pipe"など)として扱ってしまう問題がありました。このコミットにより、リクエストの書き込みとレスポンスの読み込みが並行して行われるようになり、このようなシナリオでもサーバーからのレスポンスを正確に受け取れるようになりました。
コミット
commit a5aa91b9a2a013be9ced9ae05474e1dce2fe16d4
Author: Brad Fitzpatrick <bradfitz@golang.org>
Date: Tue Jun 19 09:20:41 2012 -0700
net/http: make client await response concurrently with writing request
If the server replies with an HTTP response before we're done
writing our body (for instance "401 Unauthorized" response), we
were previously ignoring that, since we returned our write
error ("broken pipe", etc) before ever reading the response.
Now we read and write at the same time.
Fixes #3595
R=rsc, adg
CC=golang-dev
https://golang.org/cl/6238043
GitHub上でのコミットページへのリンク
https://github.com/golang/go/commit/a5aa91b9a2a013be9ced9ae05474e1dce2fe16d4
元コミット内容
net/http: クライアントがリクエストの書き込みと並行してレスポンスを待機するようにする
もしサーバーがリクエストボディの書き込みが完了する前にHTTPレスポンスを返した場合(例えば「401 Unauthorized」レスポンスなど)、以前はクライアントがそれを無視していました。なぜなら、レスポンスを読み込む前に書き込みエラー(「broken pipe」など)を返していたからです。 これで、読み込みと書き込みが同時に行われるようになります。
Fixes #3595
R=rsc, adg CC=golang-dev https://golang.org/cl/6238043
変更の背景
この変更の背景には、HTTPクライアントがリクエストボディをサーバーに送信している途中で、サーバーが早期にレスポンスを返すという特定のシナリオにおける問題がありました。
従来の net/http クライアントの実装では、HTTPリクエストの送信(特にボディの書き込み)とサーバーからのレスポンスの受信が、ある程度シーケンシャルに行われていました。つまり、クライアントはまずリクエスト全体を書き込もうとし、その書き込みが完了するか、あるいはエラーが発生するまで、サーバーからのレスポンスを待機しませんでした。
このシーケンシャルな挙動が問題となるのは、以下のようなケースです。
- 早期レスポンス: サーバーがリクエストボディの全てを受け取る前に、何らかの理由でレスポンスを返す場合。例えば、認証が必要なエンドポイントに対して認証情報なしで大きなボディをPOSTした場合、サーバーはボディの受信を待たずに「401 Unauthorized」のようなエラーレスポンスをすぐに返すことがあります。
- 「broken pipe」問題: 従来のクライアントは、リクエストボディの書き込み中にサーバーが接続を切断した場合(早期レスポンスを返した後など)、
writeシステムコールが「broken pipe」エラーを返していました。クライアントはこの書き込みエラーを即座に処理し、レスポンスを読み込むことなくエラーを返していました。 - 真のレスポンスの喪失: 結果として、クライアントはサーバーが意図した「401 Unauthorized」のような意味のあるレスポンスを受け取ることができず、単なる書き込みエラーとして処理してしまい、問題の根本原因を特定するのが困難でした。
このコミットは、このような状況下でもクライアントがサーバーからの早期レスポンスを適切に受け取り、処理できるようにするために導入されました。これにより、クライアントはより堅牢になり、サーバーの挙動に柔軟に対応できるようになります。
前提知識の解説
このコミットを理解するためには、以下の技術的知識が役立ちます。
-
HTTPプロトコル:
- リクエスト/レスポンスモデル: クライアントがリクエストを送信し、サーバーがレスポンスを返すという基本的なHTTPの通信モデル。
- リクエストボディ: POSTやPUTなどのメソッドで、クライアントがサーバーに送信するデータ本体。
- レスポンスステータスコード: サーバーがリクエストの結果を示す3桁の数字(例: 200 OK, 401 Unauthorized, 500 Internal Server Error)。
- TCPソケット: HTTP通信の基盤となるトランスポート層のプロトコル。データはストリームとして送受信されます。
-
Go言語の並行処理:
- Goroutine (ゴルーチン): Go言語における軽量なスレッドのようなもの。非常に低コストで多数生成でき、並行処理を実現します。
- Channel (チャネル): Goroutine間で安全にデータを送受信するための通信メカニズム。チャネルを介した通信は同期的に行われるため、データ競合を防ぎます。
selectステートメント: 複数のチャネル操作を同時に待機し、準備ができたチャネル操作を実行するためのGoの構文。非同期処理の調整に不可欠です。
-
net/httpパッケージ:Transport:net/httpパッケージにおいて、実際のHTTPリクエストの送信とレスポンスの受信を担当するインターフェース。コネクションの管理、リクエストの多重化、プロキシの処理などを行います。persistConn:Transportの内部で、単一のTCPコネクションを介して複数のHTTPリクエスト/レスポンスを処理するための構造体。HTTP/1.1のKeep-Alive機能を実現するために使用されます。readLoop:persistConn内で、サーバーからのレスポンスを継続的に読み込むためのGoroutine。roundTrip:Transportインターフェースの主要なメソッドで、単一のHTTPリクエストを送信し、対応するレスポンスを受け取る一連の処理をカプセル化します。
-
I/O操作とバッファリング:
io.Reader/io.Writer: Go言語における基本的なI/Oインターフェース。データの読み書き操作を抽象化します。bufio.Reader/bufio.Writer: バッファリングされたI/Oを提供する構造体。これにより、小さなI/O操作が多数発生するのを防ぎ、効率を向上させます。Flush()メソッドはバッファの内容を実際に書き込み先に送るために重要です。- 「broken pipe」エラー: パイプやソケットの書き込み側が、読み込み側がすでにクローズされている状態で書き込みを試みた場合に発生するエラー。TCPソケット通信では、サーバーが接続を閉じた後にクライアントが書き込みを続けると発生します。
これらの概念を理解することで、コミットがどのようにして既存の課題を解決し、net/http クライアントの堅牢性を向上させたのかを深く把握できます。
技術的詳細
このコミットの核心は、HTTPクライアントがリクエストの書き込みとレスポンスの読み込みを並行して行うための新しいメカニズムを導入した点にあります。これにより、サーバーがリクエストボディの受信完了を待たずに早期にレスポンスを返した場合でも、クライアントがそのレスポンスを適切に処理できるようになります。
具体的な技術的変更点は以下の通りです。
-
writechチャネルの導入:persistConn構造体にwritech chan writeRequestという新しいチャネルが追加されました。このチャネルは、roundTripゴルーチンからリクエストの書き込み要求を受け取り、writeLoopゴルーチンに渡す役割を担います。writeRequestは、書き込むリクエスト (*transportRequest) と、書き込み結果を返すためのチャネル (chan<- error) を含む新しい構造体です。
-
writeLoopゴルーチンの導入:persistConnの初期化時(getConnメソッド内)に、go pconn.writeLoop()という新しいゴルーチンが起動されるようになりました。- この
writeLoopゴルーチンは、pc.writechからwriteRequestを継続的に読み取ります。 writeRequestを受け取ると、その中のリクエスト (wr.req.Request) をpc.bw(バッファリングされたライター) を使って実際にコネクションに書き込みます。- 書き込みが完了したら、
pc.bw.Flush()を呼び出してバッファの内容を物理的なコネクションに送信します。 - 書き込み中にエラーが発生した場合(例: サーバーが接続を閉じたことによる「broken pipe」)、
pc.markBroken()を呼び出してコネクションを「壊れた」状態としてマークし、そのエラーをwriteRequestに含まれるチャネル (wr.ch) を通じてroundTripゴルーチンに返します。
-
roundTripメソッドの変更:- 以前は
roundTripメソッド内でリクエストの書き込みとフラッシュを直接行い、その後レスポンスを待機していました。 - 変更後、リクエストの書き込みは
writeLoopゴルーチンに委譲されるようになりました。具体的には、writeRequestを作成し、pc.writechに送信します。 - 最も重要な変更は、
selectステートメントの導入です。roundTripは、writeErrCh(書き込みエラーを受け取るチャネル) とresc(レスポンスを受け取るチャネル) の両方を同時に待機するようになりました。 - これにより、リクエストの書き込みが完了する前にサーバーからレスポンスが返された場合でも、
rescチャネルからのイベントが先に発生し、クライアントは早期レスポンスを即座に処理できるようになります。 - もし書き込みエラーが先に発生した場合、そのエラーが優先され、レスポンスの待機は中断されます。
- 以前は
-
markBrokenメソッドの導入:persistConnにmarkBroken()という新しいメソッドが追加されました。これは、コネクションが再利用できない状態になったことをマークしますが、基盤となるコネクション自体はすぐに閉じません。これは、readLoopがまだそのコネクションからデータを読み取っている可能性があるためです。
これらの変更により、net/http クライアントは、リクエストの書き込みとレスポンスの読み込みという2つの独立したタスクを並行して実行できるようになり、サーバーからの早期レスポンスを適切に処理し、より堅牢な通信を実現します。
コアとなるコードの変更箇所
このコミットによる主要なコード変更は、src/pkg/net/http/transport.go と src/pkg/net/http/transport_test.go の2つのファイルに集中しています。
src/pkg/net/http/transport.go
-
persistConn構造体へのwritechの追加:--- a/src/pkg/net/http/transport.go +++ b/src/pkg/net/http/transport.go @@ -487,7 +489,8 @@ type persistConn struct { closed bool // whether conn has been closed br *bufio.Reader // from conn bw *bufio.Writer // to conn - reqch chan requestAndChan // written by roundTrip(); read by readLoop() + reqch chan requestAndChan // written by roundTrip; read by readLoop + writech chan writeRequest // written by roundTrip; read by writeLoop isProxy boolpersistConnにwritechチャネルが追加され、リクエストの書き込み要求をwriteLoopに送るために使用されます。 -
getConnでwritechの初期化とwriteLoopの起動:--- a/src/pkg/net/http/transport.go +++ b/src/pkg/net/http/transport.go @@ -323,6 +323,7 @@ func (t *Transport) getConn(cm *connectMethod) (*persistConn, error) { cacheKey: cm.String(), conn: conn, reqch: make(chan requestAndChan, 50),\n + writech: make(chan writeRequest, 50), } switch { @@ -380,6 +381,7 @@ func (t *Transport) getConn(cm *connectMethod) (*persistConn, error) { pconn.br = bufio.NewReader(pconn.conn) pconn.bw = bufio.NewWriter(pconn.conn) go pconn.readLoop() + go pconn.writeLoop() return pconn, nil }新しいコネクションが確立される際に、
writechが初期化され、writeLoopゴルーチンが起動されます。 -
readLoopでwritechをクローズ:--- a/src/pkg/net/http/transport.go +++ b/src/pkg/net/http/transport.go @@ -519,6 +522,7 @@ func remoteSideClosed(err error) bool { } func (pc *persistConn) readLoop() { + defer close(pc.writech) alive := true var lastbody io.ReadCloser // last response body, if any, read on this connectionreadLoopが終了する際にwritechをクローズすることで、writeLoopが適切に終了できるようにします。 -
writeLoopゴルーチンの追加:--- a/src/pkg/net/http/transport.go +++ b/src/pkg/net/http/transport.go @@ -615,6 +619,23 @@ func (pc *persistConn) readLoop() { } } +func (pc *persistConn) writeLoop() { + for wr := range pc.writech { + if pc.isBroken() { + wr.ch <- errors.New("http: can't write HTTP request on broken connection") + continue + } + err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra) + if err == nil { + err = pc.bw.Flush() + } + if err != nil { + pc.markBroken() + } + wr.ch <- err + } +} + type responseAndError struct { res *Response err errorリクエストの書き込みとフラッシュを担当する新しいゴルーチン
writeLoopが追加されました。 -
writeRequest構造体の追加:--- a/src/pkg/net/http/transport.go +++ b/src/pkg/net/http/transport.go @@ -630,6 +651,15 @@ type requestAndChan struct { addedGzip bool } +// A writeRequest is sent by the readLoop's goroutine to the +// writeLoop's goroutine to write a request while the read loop +// concurrently waits on both the write response and the server's +// reply. +type writeRequest struct { + req *transportRequest + ch chan<- error +} + func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) { if pc.mutateHeaderFunc != nil { pc.mutateHeaderFunc(req.extraHeaders())writeLoopとroundTrip間で書き込み要求をやり取りするためのwriteRequest構造体が定義されました。 -
roundTripメソッドの並行処理ロジックの変更:--- a/src/pkg/net/http/transport.go +++ b/src/pkg/net/http/transport.go @@ -652,16 +682,29 @@ func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err err pc.numExpectedResponses++ pc.lk.Unlock() - err = req.Request.write(pc.bw, pc.isProxy, req.extra) - if err != nil { - pc.close() - return + // Write the request concurrently with waiting for a response, + // in case the server decides to reply before reading our full + // request body. + writeErrCh := make(chan error, 1) + pc.writech <- writeRequest{req, writeErrCh} + + resc := make(chan responseAndError, 1) + pc.reqch <- requestAndChan{req.Request, resc, requestedGzip} + + var re responseAndError +WaitResponse: + for { + select { + case err := <-writeErrCh: + if err != nil { + re = responseAndError{nil, err} + break WaitResponse + } + case re = <-resc: + break WaitResponse + } } - pc.bw.Flush() - ch := make(chan responseAndError, 1) - pc.reqch <- requestAndChan{req.Request, ch, requestedGzip} - re := <-ch pc.lk.Lock() pc.numExpectedResponses-- pc.lk.Unlock()リクエストの書き込みを
writechに送信し、writeErrChとrescの両方をselectで同時に待機するようになりました。 -
markBrokenメソッドの追加:--- a/src/pkg/net/http/transport.go +++ b/src/pkg/net/http/transport.go @@ -669,6 +712,15 @@ func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err err return re.res, re.err } +// markBroken marks a connection as broken (so it's not reused). +// It differs from close in that it doesn't close the underlying +// connection for use when it's still being read. +func (pc *persistConn) markBroken() { + pc.lk.Lock() + defer pc.lk.Unlock() + pc.broken = true +} + func (pc *persistConn) close() { pc.lk.Lock() defer pc.lk.Unlock()コネクションを「壊れた」状態としてマークするためのヘルパーメソッドが追加されました。
src/pkg/net/http/transport_test.go
TestIssue3595テストケースの追加:
このテストケースは、サーバーがリクエストボディを完全に読み取る前にレスポンスを返した場合でも、クライアントがそのレスポンスを正しく受け取れることを検証します。特に、--- a/src/pkg/net/http/transport_test.go +++ b/src/pkg/net/http/transport_test.go @@ -833,6 +833,30 @@ func TestIssue3644(t *testing.T) { } } +// Test that a client receives a server's reply, even if the server doesn't read +// the entire request body. +func TestIssue3595(t *testing.T) { + const deniedMsg = "sorry, denied." + ts := httptest.NewServer(HandlerFunc(func(w ResponseWriter, r *Request) { + Error(w, deniedMsg, StatusUnauthorized) + })) + defer ts.Close() + tr := &Transport{} + c := &Client{Transport: tr} + res, err := c.Post(ts.URL, "application/octet-stream", neverEnding('a')) + if err != nil { + t.Errorf("Post: %v", err) + return + } + got, err := ioutil.ReadAll(res.Body) + if err != nil { + t.Fatalf("Body ReadAll: %v", err) + } + if !strings.Contains(string(got), deniedMsg) { + t.Errorf("Known bug: response %q does not contain %q", got, deniedMsg) + } +} + type fooProto struct{} func (fooProto) RoundTrip(req *Request) (*Response, error) {neverEnding('a')という無限のボディを送信し、サーバーが401 Unauthorizedを返すシナリオをシミュレートしています。
これらの変更は、net/http クライアントの堅牢性と信頼性を大幅に向上させるものです。
コアとなるコードの解説
このコミットのコアとなる変更は、net/http/transport.go 内の persistConn 構造体と、それに関連する roundTrip、readLoop、そして新しく追加された writeLoop ゴルーチンの協調動作にあります。
persistConn 構造体
writech chan writeRequest: この新しいチャネルは、roundTripメソッドからwriteLoopゴルーチンへのリクエスト書き込み要求を伝達するために使用されます。バッファリングされたチャネル(容量50)であるため、ある程度の要求をキューに入れることができます。
writeRequest 構造体
req *transportRequest: 実際に書き込むHTTPリクエストの情報を保持します。ch chan<- error: 書き込み操作の結果(エラーまたはnil)をroundTripメソッドにフィードバックするためのチャネルです。これにより、書き込みの成功/失敗をroundTripが知ることができます。
getConn 関数
persistConnのインスタンスが作成される際に、writechが初期化され、go pconn.writeLoop()が呼び出されます。これにより、コネクションごとに専用のwriteLoopゴルーチンが起動され、リクエストの書き込みを非同期で処理する準備が整います。
writeLoop ゴルーチン
このゴルーチンは、リクエストボディの実際の書き込みを担当します。
func (pc *persistConn) writeLoop() {
for wr := range pc.writech { // (1) writech から書き込み要求を読み取る
if pc.isBroken() { // (2) コネクションが壊れていないかチェック
wr.ch <- errors.New("http: can't write HTTP request on broken connection")
continue
}
err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra) // (3) リクエストをバッファに書き込む
if err == nil {
err = pc.bw.Flush() // (4) バッファをフラッシュし、物理コネクションに送信
}
if err != nil {
pc.markBroken() // (5) エラーがあればコネクションを壊れた状態にする
}
wr.ch <- err // (6) 結果を roundTrip に返す
}
}
for wr := range pc.writech:writechからwriteRequestを受け取るまでブロックします。チャネルがクローズされるとループを終了します。if pc.isBroken(): コネクションがすでに壊れている場合、書き込みは行わず、エラーを返します。wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra): HTTPリクエストのヘッダーとボディをbufio.Writer(pc.bw) に書き込みます。この時点ではまだ物理的なネットワークには送信されません。err = pc.bw.Flush():pc.bwにバッファリングされたデータを物理的なネットワークコネクションに送信します。ここで実際にデータがサーバーに送られます。if err != nil { pc.markBroken() }: 書き込みまたはフラッシュ中にエラーが発生した場合、そのコネクションを「壊れた」状態としてマークします。これにより、このコネクションが将来のHTTPリクエストに再利用されるのを防ぎます。wr.ch <- err: 書き込み操作の結果(エラーまたはnil)を、writeRequestに含まれるチャネルを通じてroundTripゴルーチンに返します。
roundTrip メソッド
このメソッドは、単一のHTTPリクエストを送信し、レスポンスを受け取るクライアント側の主要なロジックを含んでいます。
func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
// ... (前略) ...
// Write the request concurrently with waiting for a response,
// in case the server decides to reply before reading our full
// request body.
writeErrCh := make(chan error, 1) // (1) 書き込みエラーを受け取るチャネル
pc.writech <- writeRequest{req, writeErrCh} // (2) writeLoop に書き込み要求を送信
resc := make(chan responseAndError, 1) // (3) レスポンスを受け取るチャネル
pc.reqch <- requestAndChan{req.Request, resc, requestedGzip} // (4) readLoop にレスポンス待機要求を送信
var re responseAndError
WaitResponse:
for {
select { // (5) writeErrCh と resc の両方を同時に待機
case err := <-writeErrCh: // (6) 書き込みエラーが発生した場合
if err != nil {
re = responseAndError{nil, err}
break WaitResponse
}
case re = <-resc: // (7) レスポンスが到着した場合
break WaitResponse
}
}
// ... (後略) ...
return re.res, re.err
}
writeErrCh := make(chan error, 1):writeLoopから書き込みエラーを受け取るためのチャネルを作成します。バッファリングされているため、writeLoopはroundTripがチャネルを読み取るのを待たずにエラーを送信できます。pc.writech <- writeRequest{req, writeErrCh}:writeRequestを作成し、pc.writechに送信します。これにより、writeLoopゴルーチンがこのリクエストの書き込みを開始します。この操作は非同期で行われます。resc := make(chan responseAndError, 1):readLoopからサーバーレスポンスを受け取るためのチャネルを作成します。pc.reqch <- requestAndChan{req.Request, resc, requestedGzip}:readLoopゴルーチンに、このリクエストに対するレスポンスを待機するよう指示します。select { ... }: ここがこの変更の最も重要な部分です。roundTripは、writeErrChとrescの両方からのイベントを同時に待機します。case err := <-writeErrCh:: もしwriteLoopがリクエストの書き込み中にエラー(例: "broken pipe")を検出してwriteErrChに送信した場合、このケースが実行されます。roundTripはそのエラーを即座に処理し、レスポンスの待機を中断してエラーを返します。case re = <-resc:: もしreadLoopがサーバーからのレスポンスを先に受け取ってrescに送信した場合、このケースが実行されます。roundTripはレスポンスを処理し、書き込みエラーの発生を待たずにレスポンスを返します。
markBroken メソッド
pc.broken = true: コネクションが再利用できない状態であることを示すフラグを設定します。- このメソッドは、基盤となるTCPコネクションをすぐに閉じません。これは、
readLoopがまだそのコネクションから残りのデータを読み取っている可能性があるためです。コネクションの実際のクローズは、readLoopが終了するか、または他のメカニズムによって行われます。
これらの変更により、net/http クライアントは、リクエストの書き込みとレスポンスの読み込みを真に並行して実行できるようになり、サーバーからの早期レスポンスを適切に処理し、より堅牢な通信を実現します。
関連リンク
- Go CL (Change List): https://golang.org/cl/6238043
- GitHub Issue #3595: https://github.com/golang/go/issues/3595
参考にした情報源リンク
- Go言語の公式ドキュメント:
net/httpパッケージ - Go言語の並行処理に関するドキュメント(Goroutines, Channels, Select)
- HTTP/1.1 プロトコル仕様 (RFC 2616 または RFC 7230-7235)
- TCP/IP プロトコルに関する一般的な知識
bufioパッケージのドキュメントioパッケージのドキュメント- Go言語のテストフレームワーク
testingパッケージのドキュメント httptestパッケージのドキュメント
[インデックス 13364] ファイルの概要
このコミットは、Go言語の標準ライブラリ net/http パッケージにおけるクライアントの挙動を改善するものです。具体的には、HTTPリクエストのボディを送信している最中にサーバーからレスポンスが返された場合(例えば、認証エラーなど)、以前はクライアントがそのレスポンスを適切に処理できず、書き込みエラー("broken pipe"など)として扱ってしまう問題がありました。このコミットにより、リクエストの書き込みとレスポンスの読み込みが並行して行われるようになり、このようなシナリオでもサーバーからのレスポンスを正確に受け取れるようになりました。
コミット
commit a5aa91b9a2a013be9ced9ae05474e1dce2fe16d4
Author: Brad Fitzpatrick <bradfitz@golang.org>
Date: Tue Jun 19 09:20:41 2012 -0700
net/http: make client await response concurrently with writing request
If the server replies with an HTTP response before we're done
writing our body (for instance "401 Unauthorized" response), we
were previously ignoring that, since we returned our write
error ("broken pipe", etc) before ever reading the response.
Now we read and write at the same time.
Fixes #3595
R=rsc, adg
CC=golang-dev
https://golang.org/cl/6238043
GitHub上でのコミットページへのリンク
https://github.com/golang/go/commit/a5aa91b9a2a013be9ced9ae05474e1dce2fe16d4
元コミット内容
net/http: クライアントがリクエストの書き込みと並行してレスポンスを待機するようにする
もしサーバーがリクエストボディの書き込みが完了する前にHTTPレスポンスを返した場合(例えば「401 Unauthorized」レスポンスなど)、以前はクライアントがそれを無視していました。なぜなら、レスポンスを読み込む前に書き込みエラー(「broken pipe」など)を返していたからです。 これで、読み込みと書き込みが同時に行われるようになります。
Fixes #3595
R=rsc, adg CC=golang-dev https://golang.org/cl/6238043
変更の背景
この変更の背景には、HTTPクライアントがリクエストボディをサーバーに送信している途中で、サーバーが早期にレスポンスを返すという特定のシナリオにおける問題がありました。
従来の net/http クライアントの実装では、HTTPリクエストの送信(特にボディの書き込み)とサーバーからのレスポンスの受信が、ある程度シーケンシャルに行われていました。つまり、クライアントはまずリクエスト全体を書き込もうとし、その書き込みが完了するか、あるいはエラーが発生するまで、サーバーからのレスポンスを待機しませんでした。
このシーケンシャルな挙動が問題となるのは、以下のようなケースです。
- 早期レスポンス: サーバーがリクエストボディの全てを受け取る前に、何らかの理由でレスポンスを返す場合。例えば、認証が必要なエンドポイントに対して認証情報なしで大きなボディをPOSTした場合、サーバーはボディの受信を待たずに「401 Unauthorized」のようなエラーレスポンスをすぐに返すことがあります。
- 「broken pipe」問題: 従来のクライアントは、リクエストボディの書き込み中にサーバーが接続を切断した場合(早期レスポンスを返した後など)、
writeシステムコールが「broken pipe」エラーを返していました。クライアントはこの書き込みエラーを即座に処理し、レスポンスを読み込むことなくエラーを返していました。 - 真のレスポンスの喪失: 結果として、クライアントはサーバーが意図した「401 Unauthorized」のような意味のあるレスポンスを受け取ることができず、単なる書き込みエラーとして処理してしまい、問題の根本原因を特定するのが困難でした。
このコミットは、このような状況下でもクライアントがサーバーからの早期レスポンスを適切に受け取り、処理できるようにするために導入されました。これにより、クライアントはより堅牢になり、サーバーの挙動に柔軟に対応できるようになります。
前提知識の解説
このコミットを理解するためには、以下の技術的知識が役立ちます。
-
HTTPプロトコル:
- リクエスト/レスポンスモデル: クライアントがリクエストを送信し、サーバーがレスポンスを返すという基本的なHTTPの通信モデル。
- リクエストボディ: POSTやPUTなどのメソッドで、クライアントがサーバーに送信するデータ本体。
- レスポンスステータスコード: サーバーがリクエストの結果を示す3桁の数字(例: 200 OK, 401 Unauthorized, 500 Internal Server Error)。
- TCPソケット: HTTP通信の基盤となるトランスポート層のプロトコル。データはストリームとして送受信されます。
-
Go言語の並行処理:
- Goroutine (ゴルーチン): Go言語における軽量なスレッドのようなもの。非常に低コストで多数生成でき、並行処理を実現します。
- Channel (チャネル): Goroutine間で安全にデータを送受信するための通信メカニズム。チャネルを介した通信は同期的に行われるため、データ競合を防ぎます。
selectステートメント: 複数のチャネル操作を同時に待機し、準備ができたチャネル操作を実行するためのGoの構文。非同期処理の調整に不可欠です。
-
net/httpパッケージ:Transport:net/httpパッケージにおいて、実際のHTTPリクエストの送信とレスポンスの受信を担当するインターフェース。コネクションの管理、リクエストの多重化、プロキシの処理などを行います。persistConn:Transportの内部で、単一のTCPコネクションを介して複数のHTTPリクエスト/レスポンスを処理するための構造体。HTTP/1.1のKeep-Alive機能を実現するために使用されます。readLoop:persistConn内で、サーバーからのレスポンスを継続的に読み込むためのGoroutine。roundTrip:Transportインターフェースの主要なメソッドで、単一のHTTPリクエストを送信し、対応するレスポンスを受け取る一連の処理をカプセル化します。
-
I/O操作とバッファリング:
io.Reader/io.Writer: Go言語における基本的なI/Oインターフェース。データの読み書き操作を抽象化します。bufio.Reader/bufio.Writer: バッファリングされたI/Oを提供する構造体。これにより、小さなI/O操作が多数発生するのを防ぎ、効率を向上させます。Flush()メソッドはバッファの内容を実際に書き込み先に送るために重要です。- 「broken pipe」エラー: パイプやソケットの書き込み側が、読み込み側がすでにクローズされている状態で書き込みを試みた場合に発生するエラー。TCPソケット通信では、サーバーが接続を閉じた後にクライアントが書き込みを続けると発生します。
これらの概念を理解することで、コミットがどのようにして既存の課題を解決し、net/http クライアントの堅牢性を向上させたのかを深く把握できます。
技術的詳細
このコミットの核心は、HTTPクライアントがリクエストの書き込みとレスポンスの読み込みを並行して行うための新しいメカニズムを導入した点にあります。これにより、サーバーがリクエストボディの受信完了を待たずに早期にレスポンスを返した場合でも、クライアントがそのレスポンスを適切に処理できるようになります。
具体的な技術的変更点は以下の通りです。
-
writechチャネルの導入:persistConn構造体にwritech chan writeRequestという新しいチャネルが追加されました。このチャネルは、roundTripゴルーチンからリクエストの書き込み要求を受け取り、writeLoopゴルーチンに渡す役割を担います。writeRequestは、書き込むリクエスト (*transportRequest) と、書き込み結果を返すためのチャネル (chan<- error) を含む新しい構造体です。
-
writeLoopゴルーチンの導入:persistConnの初期化時(getConnメソッド内)に、go pconn.writeLoop()という新しいゴルーチンが起動されるようになりました。- この
writeLoopゴルーチンは、pc.writechからwriteRequestを継続的に読み取ります。 writeRequestを受け取ると、その中のリクエスト (wr.req.Request) をpc.bw(バッファリングされたライター) を使って実際にコネクションに書き込みます。- 書き込みが完了したら、
pc.bw.Flush()を呼び出してバッファの内容を物理的なコネクションに送信します。 - 書き込み中にエラーが発生した場合(例: サーバーが接続を閉じたことによる「broken pipe」)、
pc.markBroken()を呼び出してコネクションを「壊れた」状態としてマークし、そのエラーをwriteRequestに含まれるチャネル (wr.ch) を通じてroundTripゴルーチンに返します。
-
roundTripメソッドの変更:- 以前は
roundTripメソッド内でリクエストの書き込みとフラッシュを直接行い、その後レスポンスを待機していました。 - 変更後、リクエストの書き込みは
writeLoopゴルーチンに委譲されるようになりました。具体的には、writeRequestを作成し、pc.writechに送信します。 - 最も重要な変更は、
selectステートメントの導入です。roundTripは、writeErrCh(書き込みエラーを受け取るチャネル) とresc(レスポンスを受け取るチャネル) の両方を同時に待機するようになりました。 - これにより、リクエストの書き込みが完了する前にサーバーからレスポンスが返された場合でも、
rescチャネルからのイベントが先に発生し、クライアントは早期レスポンスを即座に処理できるようになります。 - もし書き込みエラーが先に発生した場合、そのエラーが優先され、レスポンスの待機は中断されます。
- 以前は
-
markBrokenメソッドの導入:persistConnにmarkBroken()という新しいメソッドが追加されました。これは、コネクションが再利用できない状態になったことをマークしますが、基盤となるコネクション自体はすぐに閉じません。これは、readLoopがまだそのコネクションからデータを読み取っている可能性があるためです。
これらの変更により、net/http クライアントは、リクエストの書き込みとレスポンスの読み込みという2つの独立したタスクを並行して実行できるようになり、サーバーからの早期レスポンスを適切に処理し、より堅牢な通信を実現します。
コアとなるコードの変更箇所
このコミットによる主要なコード変更は、src/pkg/net/http/transport.go と src/pkg/net/http/transport_test.go の2つのファイルに集中しています。
src/pkg/net/http/transport.go
-
persistConn構造体へのwritechの追加:--- a/src/pkg/net/http/transport.go +++ b/src/pkg/net/http/transport.go @@ -487,7 +489,8 @@ type persistConn struct { closed bool // whether conn has been closed br *bufio.Reader // from conn bw *bufio.Writer // to conn - reqch chan requestAndChan // written by roundTrip(); read by readLoop() + reqch chan requestAndChan // written by roundTrip; read by readLoop + writech chan writeRequest // written by roundTrip; read by writeLoop isProxy boolpersistConnにwritechチャネルが追加され、リクエストの書き込み要求をwriteLoopに送るために使用されます。 -
getConnでwritechの初期化とwriteLoopの起動:--- a/src/pkg/net/http/transport.go +++ b/src/pkg/net/http/transport.go @@ -323,6 +323,7 @@ func (t *Transport) getConn(cm *connectMethod) (*persistConn, error) { cacheKey: cm.String(), conn: conn, reqch: make(chan requestAndChan, 50),\n + writech: make(chan writeRequest, 50), } switch { @@ -380,6 +381,7 @@ func (t *Transport) getConn(cm *connectMethod) (*persistConn, error) { pconn.br = bufio.NewReader(pconn.conn) pconn.bw = bufio.NewWriter(pconn.conn) go pconn.readLoop() + go pconn.writeLoop() return pconn, nil }新しいコネクションが確立される際に、
writechが初期化され、writeLoopゴルーチンが起動されます。 -
readLoopでwritechをクローズ:--- a/src/pkg/net/http/transport.go +++ b/src/pkg/net/http/transport.go @@ -519,6 +522,7 @@ func remoteSideClosed(err error) bool { } func (pc *persistConn) readLoop() { + defer close(pc.writech) alive := true var lastbody io.ReadCloser // last response body, if any, read on this connectionreadLoopが終了する際にwritechをクローズすることで、writeLoopが適切に終了できるようにします。 -
writeLoopゴルーチンの追加:--- a/src/pkg/net/http/transport.go +++ b/src/pkg/net/http/transport.go @@ -615,6 +619,23 @@ func (pc *persistConn) readLoop() { } } +func (pc *persistConn) writeLoop() { + for wr := range pc.writech { // (1) writech から書き込み要求を読み取る + if pc.isBroken() { // (2) コネクションが壊れていないかチェック + wr.ch <- errors.New("http: can't write HTTP request on broken connection") + continue + } + err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra) // (3) リクエストをバッファに書き込む + if err == nil { + err = pc.bw.Flush() // (4) バッファをフラッシュし、物理コネクションに送信 + } + if err != nil { + pc.markBroken() // (5) エラーがあればコネクションを壊れた状態にする + } + wr.ch <- err // (6) 結果を roundTrip に返す + } +} + type responseAndError struct { res *Response err errorリクエストの書き込みとフラッシュを担当する新しいゴルーチン
writeLoopが追加されました。 -
writeRequest構造体の追加:--- a/src/pkg/net/http/transport.go +++ b/src/pkg/net/http/transport.go @@ -630,6 +651,15 @@ type requestAndChan struct { addedGzip bool } +// A writeRequest is sent by the readLoop's goroutine to the +// writeLoop's goroutine to write a request while the read loop +// concurrently waits on both the write response and the server's +// reply. +type writeRequest struct { + req *transportRequest + ch chan<- error +} + func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) { if pc.mutateHeaderFunc != nil { pc.mutateHeaderFunc(req.extraHeaders())writeLoopとroundTrip間で書き込み要求をやり取りするためのwriteRequest構造体が定義されました。 -
roundTripメソッドの並行処理ロジックの変更:--- a/src/pkg/net/http/transport.go +++ b/src/pkg/net/http/transport.go @@ -652,16 +682,29 @@ func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err err pc.numExpectedResponses++ pc.lk.Unlock() - err = req.Request.write(pc.bw, pc.isProxy, req.extra) - if err != nil { - pc.close() - return + // Write the request concurrently with waiting for a response, + // in case the server decides to reply before reading our full + // request body. + writeErrCh := make(chan error, 1) // (1) 書き込みエラーを受け取るチャネル + pc.writech <- writeRequest{req, writeErrCh} // (2) writeLoop に書き込み要求を送信 + + resc := make(chan responseAndError, 1) // (3) レスポンスを受け取るチャネル + pc.reqch <- requestAndChan{req.Request, resc, requestedGzip} // (4) readLoop にレスポンス待機要求を送信 + + var re responseAndError +WaitResponse: + for { + select { // (5) writeErrCh と resc の両方を同時に待機 + case err := <-writeErrCh: // (6) 書き込みエラーが発生した場合 + if err != nil { + re = responseAndError{nil, err} + break WaitResponse + } + case re = <-resc: // (7) レスポンスが到着した場合 + break WaitResponse + } } - pc.bw.Flush() - ch := make(chan responseAndError, 1) - pc.reqch <- requestAndChan{req.Request, ch, requestedGzip} - re := <-ch pc.lk.Lock() pc.numExpectedResponses-- pc.lk.Unlock()リクエストの書き込みを
writechに送信し、writeErrChとrescの両方をselectで同時に待機するようになりました。 -
markBrokenメソッドの追加:--- a/src/pkg/net/http/transport.go +++ b/src/pkg/net/http/transport.go @@ -669,6 +712,15 @@ func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err err return re.res, re.err } +// markBroken marks a connection as broken (so it's not reused). +// It differs from close in that it doesn't close the underlying +// connection for use when it's still being read. +func (pc *persistConn) markBroken() { + pc.lk.Lock() + defer pc.lk.Unlock() + pc.broken = true +} + func (pc *persistConn) close() { pc.lk.Lock() defer pc.lk.Unlock()コネクションを「壊れた」状態としてマークするためのヘルパーメソッドが追加されました。
src/pkg/net/http/transport_test.go
TestIssue3595テストケースの追加:
このテストケースは、サーバーがリクエストボディを完全に読み取る前にレスポンスを返した場合でも、クライアントがそのレスポンスを正しく受け取れることを検証します。特に、--- a/src/pkg/net/http/transport_test.go +++ b/src/pkg/net/http/transport_test.go @@ -833,6 +833,30 @@ func TestIssue3644(t *testing.T) { } } +// Test that a client receives a server's reply, even if the server doesn't read +// the entire request body. +func TestIssue3595(t *testing.T) { + const deniedMsg = "sorry, denied." + ts := httptest.NewServer(HandlerFunc(func(w ResponseWriter, r *Request) { + Error(w, deniedMsg, StatusUnauthorized) + })) + defer ts.Close() + tr := &Transport{} + c := &Client{Transport: tr} + res, err := c.Post(ts.URL, "application/octet-stream", neverEnding('a')) + if err != nil { + t.Errorf("Post: %v", err) + return + } + got, err := ioutil.ReadAll(res.Body) + if err != nil { + t.Fatalf("Body ReadAll: %v", err) + } + if !strings.Contains(string(got), deniedMsg) { + t.Errorf("Known bug: response %q does not contain %q", got, deniedMsg) + } +} + type fooProto struct{} func (fooProto) RoundTrip(req *Request) (*Response, error) {neverEnding('a')という無限のボディを送信し、サーバーが401 Unauthorizedを返すシナリオをシミュレートしています。
これらの変更は、net/http クライアントの堅牢性と信頼性を大幅に向上させるものです。
コアとなるコードの解説
このコミットのコアとなる変更は、net/http/transport.go 内の persistConn 構造体と、それに関連する roundTrip、readLoop、そして新しく追加された writeLoop ゴルーチンの協調動作にあります。
persistConn 構造体
writech chan writeRequest: この新しいチャネルは、roundTripメソッドからwriteLoopゴルーチンへのリクエスト書き込み要求を伝達するために使用されます。バッファリングされたチャネル(容量50)であるため、ある程度の要求をキューに入れることができます。
writeRequest 構造体
req *transportRequest: 実際に書き込むHTTPリクエストの情報を保持します。ch chan<- error: 書き込み操作の結果(エラーまたはnil)をroundTripメソッドにフィードバックするためのチャネルです。これにより、書き込みの成功/失敗をroundTripが知ることができます。
getConn 関数
persistConnのインスタンスが作成される際に、writechが初期化され、go pconn.writeLoop()が呼び出されます。これにより、コネクションごとに専用のwriteLoopゴルーチンが起動され、リクエストの書き込みを非同期で処理する準備が整います。
writeLoop ゴルーチン
このゴルーチンは、リクエストボディの実際の書き込みを担当します。
func (pc *persistConn) writeLoop() {
for wr := range pc.writech { // (1) writech から書き込み要求を読み取る
if pc.isBroken() { // (2) コネクションが壊れていないかチェック
wr.ch <- errors.New("http: can't write HTTP request on broken connection")
continue
}
err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra) // (3) リクエストをバッファに書き込む
if err == nil {
err = pc.bw.Flush() // (4) バッファをフラッシュし、物理コネクションに送信
}
if err != nil {
pc.markBroken() // (5) エラーがあればコネクションを壊れた状態にする
}
wr.ch <- err // (6) 結果を roundTrip に返す
}
}
for wr := range pc.writech:writechからwriteRequestを受け取るまでブロックします。チャネルがクローズされるとループを終了します。if pc.isBroken(): コネクションがすでに壊れている場合、書き込みは行わず、エラーを返します。wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra): HTTPリクエストのヘッダーとボディをbufio.Writer(pc.bw) に書き込みます。この時点ではまだ物理的なネットワークには送信されません。err = pc.bw.Flush():pc.bwにバッファリングされたデータを物理的なネットワークコネクションに送信します。ここで実際にデータがサーバーに送られます。if err != nil { pc.markBroken() }: 書き込みまたはフラッシュ中にエラーが発生した場合、そのコネクションを「壊れた」状態としてマークします。これにより、このコネクションが将来のHTTPリクエストに再利用されるのを防ぎます。wr.ch <- err: 書き込み操作の結果(エラーまたはnil)を、writeRequestに含まれるチャネルを通じてroundTripゴルーチンに返します。
roundTrip メソッド
このメソッドは、単一のHTTPリクエストを送信し、レスポンスを受け取るクライアント側の主要なロジックを含んでいます。
func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
// ... (前略) ...
// Write the request concurrently with waiting for a response,
// in case the server decides to reply before reading our full
// request body.
writeErrCh := make(chan error, 1) // (1) 書き込みエラーを受け取るチャネル
pc.writech <- writeRequest{req, writeErrCh} // (2) writeLoop に書き込み要求を送信
resc := make(chan responseAndError, 1) // (3) レスポンスを受け取るチャネル
pc.reqch <- requestAndChan{req.Request, resc, requestedGzip} // (4) readLoop にレスポンス待機要求を送信
var re responseAndError
WaitResponse:
for {
select { // (5) writeErrCh と resc の両方を同時に待機
case err := <-writeErrCh: // (6) 書き込みエラーが発生した場合
if err != nil {
re = responseAndError{nil, err}
break WaitResponse
}
case re = <-resc: // (7) レスポンスが到着した場合
break WaitResponse
}
}
// ... (後略) ...
return re.res, re.err
}
writeErrCh := make(chan error, 1):writeLoopから書き込みエラーを受け取るためのチャネルを作成します。バッファリングされているため、writeLoopはroundTripがチャネルを読み取るのを待たずにエラーを送信できます。pc.writech <- writeRequest{req, writeErrCh}:writeRequestを作成し、pc.writechに送信します。これにより、writeLoopゴルーチンがこのリクエストの書き込みを開始します。この操作は非同期で行われます。resc := make(chan responseAndError, 1):readLoopからサーバーレスポンスを受け取るためのチャネルを作成します。pc.reqch <- requestAndChan{req.Request, resc, requestedGzip}:readLoopゴルーチンに、このリクエストに対するレスポンスを待機するよう指示します。select { ... }: ここがこの変更の最も重要な部分です。roundTripは、writeErrChとrescの両方からのイベントを同時に待機します。case err := <-writeErrCh:: もしwriteLoopがリクエストの書き込み中にエラー(例: "broken pipe")を検出してwriteErrChに送信した場合、このケースが実行されます。roundTripはそのエラーを即座に処理し、レスポンスの待機を中断してエラーを返します。case re = <-resc:: もしreadLoopがサーバーからのレスポンスを先に受け取ってrescに送信した場合、このケースが実行されます。roundTripはレスポンスを処理し、書き込みエラーの発生を待たずにレスポンスを返します。
markBroken メソッド
pc.broken = true: コネクションが再利用できない状態であることを示すフラグを設定します。- このメソッドは、基盤となるTCPコネクションをすぐに閉じません。これは、
readLoopがまだそのコネクションから残りのデータを読み取っている可能性があるためです。コネクションの実際のクローズは、readLoopが終了するか、または他のメカニズムによって行われます。
これらの変更により、net/http クライアントは、リクエストの書き込みとレスポンスの読み込みを真に並行して実行できるようになり、サーバーからの早期レスポンスを適切に処理し、より堅牢な通信を実現します。
関連リンク
- Go CL (Change List): https://golang.org/cl/6238043
- GitHub Issue #3595: https://github.com/golang/go/issues/3595
参考にした情報源リンク
- Go言語の公式ドキュメント:
net/httpパッケージ - Go言語の並行処理に関するドキュメント(Goroutines, Channels, Select)
- HTTP/1.1 プロトコル仕様 (RFC 2616 または RFC 7230-7235)
- TCP/IP プロトコルに関する一般的な知識
bufioパッケージのドキュメントioパッケージのドキュメント- Go言語のテストフレームワーク
testingパッケージのドキュメント httptestパッケージのドキュメント- Web検索結果: "Go net/http Issue 3595" (このコミットが修正した問題に関連する議論や、
rstAvoidanceDelayなどの関連概念について言及されている可能性があります。)
[インデックス 13364] ファイルの概要
このコミットは、Go言語の標準ライブラリ net/http パッケージにおけるクライアントの挙動を改善するものです。具体的には、HTTPリクエストのボディを送信している最中にサーバーからレスポンスが返された場合(例えば、認証エラーなど)、以前はクライアントがそのレスポンスを適切に処理できず、書き込みエラー("broken pipe"など)として扱ってしまう問題がありました。このコミットにより、リクエストの書き込みとレスポンスの読み込みが並行して行われるようになり、このようなシナリオでもサーバーからのレスポンスを正確に受け取れるようになりました。
コミット
commit a5aa91b9a2a013be9ced9ae05474e1dce2fe16d4
Author: Brad Fitzpatrick <bradfitz@golang.org>
Date: Tue Jun 19 09:20:41 2012 -0700
net/http: make client await response concurrently with writing request
If the server replies with an HTTP response before we're done
writing our body (for instance "401 Unauthorized" response), we
were previously ignoring that, since we returned our write
error ("broken pipe", etc) before ever reading the response.
Now we read and write at the same time.
Fixes #3595
R=rsc, adg
CC=golang-dev
https://golang.org/cl/6238043
GitHub上でのコミットページへのリンク
https://github.com/golang/go/commit/a5aa91b9a2a013be9ced9ae05474e1dce2fe16d4
元コミット内容
net/http: クライアントがリクエストの書き込みと並行してレスポンスを待機するようにする
もしサーバーがリクエストボディの書き込みが完了する前にHTTPレスポンスを返した場合(例えば「401 Unauthorized」レスポンスなど)、以前はクライアントがそれを無視していました。なぜなら、レスポンスを読み込む前に書き込みエラー(「broken pipe」など)を返していたからです。 これで、読み込みと書き込みが同時に行われるようになります。
Fixes #3595
R=rsc, adg CC=golang-dev https://golang.org/cl/6238043
変更の背景
この変更の背景には、HTTPクライアントがリクエストボディをサーバーに送信している途中で、サーバーが早期にレスポンスを返すという特定のシナリオにおける問題がありました。
従来の net/http クライアントの実装では、HTTPリクエストの送信(特にボディの書き込み)とサーバーからのレスポンスの受信が、ある程度シーケンシャルに行われていました。つまり、クライアントはまずリクエスト全体を書き込もうとし、その書き込みが完了するか、あるいはエラーが発生するまで、サーバーからのレスポンスを待機しませんでした。
このシーケンシャルな挙動が問題となるのは、以下のようなケースです。
- 早期レスポンス: サーバーがリクエストボディの全てを受け取る前に、何らかの理由でレスポンスを返す場合。例えば、認証が必要なエンドポイントに対して認証情報なしで大きなボディをPOSTした場合、サーバーはボディの受信を待たずに「401 Unauthorized」のようなエラーレスポンスをすぐに返すことがあります。
- 「broken pipe」問題: 従来のクライアントは、リクエストボディの書き込み中にサーバーが接続を切断した場合(早期レスポンスを返した後など)、
writeシステムコールが「broken pipe」エラーを返していました。クライアントはこの書き込みエラーを即座に処理し、レスポンスを読み込むことなくエラーを返していました。 - 真のレスポンスの喪失: 結果として、クライアントはサーバーが意図した「401 Unauthorized」のような意味のあるレスポンスを受け取ることができず、単なる書き込みエラーとして処理してしまい、問題の根本原因を特定するのが困難でした。
このコミットは、このような状況下でもクライアントがサーバーからの早期レスポンスを適切に受け取り、処理できるようにするために導入されました。これにより、クライアントはより堅牢になり、サーバーの挙動に柔軟に対応できるようになります。
前提知識の解説
このコミットを理解するためには、以下の技術的知識が役立ちます。
-
HTTPプロトコル:
- リクエスト/レスポンスモデル: クライアントがリクエストを送信し、サーバーがレスポンスを返すという基本的なHTTPの通信モデル。
- リクエストボディ: POSTやPUTなどのメソッドで、クライアントがサーバーに送信するデータ本体。
- レスポンスステータスコード: サーバーがリクエストの結果を示す3桁の数字(例: 200 OK, 401 Unauthorized, 500 Internal Server Error)。
- TCPソケット: HTTP通信の基盤となるトランスポート層のプロトコル。データはストリームとして送受信されます。
-
Go言語の並行処理:
- Goroutine (ゴルーチン): Go言語における軽量なスレッドのようなもの。非常に低コストで多数生成でき、並行処理を実現します。
- Channel (チャネル): Goroutine間で安全にデータを送受信するための通信メカニズム。チャネルを介した通信は同期的に行われるため、データ競合を防ぎます。
selectステートメント: 複数のチャネル操作を同時に待機し、準備ができたチャネル操作を実行するためのGoの構文。非同期処理の調整に不可欠です。
-
net/httpパッケージ:Transport:net/httpパッケージにおいて、実際のHTTPリクエストの送信とレスポンスの受信を担当するインターフェース。コネクションの管理、リクエストの多重化、プロキシの処理などを行います。persistConn:Transportの内部で、単一のTCPコネクションを介して複数のHTTPリクエスト/レスポンスを処理するための構造体。HTTP/1.1のKeep-Alive機能を実現するために使用されます。readLoop:persistConn内で、サーバーからのレスポンスを継続的に読み込むためのGoroutine。roundTrip:Transportインターフェースの主要なメソッドで、単一のHTTPリクエストを送信し、対応するレスポンスを受け取る一連の処理をカプセル化します。
-
I/O操作とバッファリング:
io.Reader/io.Writer: Go言語における基本的なI/Oインターフェース。データの読み書き操作を抽象化します。bufio.Reader/bufio.Writer: バッファリングされたI/Oを提供する構造体。これにより、小さなI/O操作が多数発生するのを防ぎ、効率を向上させます。Flush()メソッドはバッファの内容を実際に書き込み先に送るために重要です。- 「broken pipe」エラー: パイプやソケットの書き込み側が、読み込み側がすでにクローズされている状態で書き込みを試みた場合に発生するエラー。TCPソケット通信では、サーバーが接続を閉じた後にクライアントが書き込みを続けると発生します。
これらの概念を理解することで、コミットがどのようにして既存の課題を解決し、net/http クライアントの堅牢性を向上させたのかを深く把握できます。
技術的詳細
このコミットの核心は、HTTPクライアントがリクエストの書き込みとレスポンスの読み込みを並行して行うための新しいメカニズムを導入した点にあります。これにより、サーバーがリクエストボディの受信完了を待たずに早期にレスポンスを返した場合でも、クライアントがそのレスポンスを適切に処理できるようになります。
具体的な技術的変更点は以下の通りです。
-
writechチャネルの導入:persistConn構造体にwritech chan writeRequestという新しいチャネルが追加されました。このチャネルは、roundTripゴルーチンからリクエストの書き込み要求を受け取り、writeLoopゴルーチンに渡す役割を担います。writeRequestは、書き込むリクエスト (*transportRequest) と、書き込み結果を返すためのチャネル (chan<- error) を含む新しい構造体です。
-
writeLoopゴルーチンの導入:persistConnの初期化時(getConnメソッド内)に、go pconn.writeLoop()という新しいゴルーチンが起動されるようになりました。- この
writeLoopゴルーチンは、pc.writechからwriteRequestを継続的に読み取ります。 writeRequestを受け取ると、その中のリクエスト (wr.req.Request) をpc.bw(バッファリングされたライター) を使って実際にコネクションに書き込みます。- 書き込みが完了したら、
pc.bw.Flush()を呼び出してバッファの内容を物理的なコネクションに送信します。 - 書き込み中にエラーが発生した場合(例: サーバーが接続を閉じたことによる「broken pipe」)、
pc.markBroken()を呼び出してコネクションを「壊れた」状態としてマークし、そのエラーをwriteRequestに含まれるチャネル (wr.ch) を通じてroundTripゴルーチンに返します。
-
roundTripメソッドの変更:- 以前は
roundTripメソッド内でリクエストの書き込みとフラッシュを直接行い、その後レスポンスを待機していました。 - 変更後、リクエストの書き込みは
writeLoopゴルーチンに委譲されるようになりました。具体的には、writeRequestを作成し、pc.writechに送信します。 - 最も重要な変更は、
selectステートメントの導入です。roundTripは、writeErrCh(書き込みエラーを受け取るチャネル) とresc(レスポンスを受け取るチャネル) の両方を同時に待機するようになりました。 - これにより、リクエストの書き込みが完了する前にサーバーからレスポンスが返された場合でも、
rescチャネルからのイベントが先に発生し、クライアントは早期レスポンスを即座に処理できるようになります。 - もし書き込みエラーが先に発生した場合、そのエラーが優先され、レスポンスの待機は中断されます。
- 以前は
-
markBrokenメソッドの導入:persistConnにmarkBroken()という新しいメソッドが追加されました。これは、コネクションが再利用できない状態になったことをマークしますが、基盤となるコネクション自体はすぐに閉じません。これは、readLoopがまだそのコネクションからデータを読み取っている可能性があるためです。
これらの変更により、net/http クライアントは、リクエストの書き込みとレスポンスの読み込みという2つの独立したタスクを並行して実行できるようになり、サーバーからの早期レスポンスを適切に処理し、より堅牢な通信を実現します。
コアとなるコードの変更箇所
このコミットによる主要なコード変更は、src/pkg/net/http/transport.go と src/pkg/net/http/transport_test.go の2つのファイルに集中しています。
src/pkg/net/http/transport.go
-
persistConn構造体へのwritechの追加:--- a/src/pkg/net/http/transport.go +++ b/src/pkg/net/http/transport.go @@ -487,7 +489,8 @@ type persistConn struct { closed bool // whether conn has been closed br *bufio.Reader // from conn bw *bufio.Writer // to conn - reqch chan requestAndChan // written by roundTrip(); read by readLoop() + reqch chan requestAndChan // written by roundTrip; read by readLoop + writech chan writeRequest // written by roundTrip; read by writeLoop isProxy boolpersistConnにwritechチャネルが追加され、リクエストの書き込み要求をwriteLoopに送るために使用されます。 -
getConnでwritechの初期化とwriteLoopの起動:--- a/src/pkg/net/http/transport.go +++ b/src/pkg/net/http/transport.go @@ -323,6 +323,7 @@ func (t *Transport) getConn(cm *connectMethod) (*persistConn, error) { cacheKey: cm.String(), conn: conn, reqch: make(chan requestAndChan, 50),\n + writech: make(chan writeRequest, 50), } switch { @@ -380,6 +381,7 @@ func (t *Transport) getConn(cm *connectMethod) (*persistConn, error) { pconn.br = bufio.NewReader(pconn.conn) pconn.bw = bufio.NewWriter(pconn.conn) go pconn.readLoop() + go pconn.writeLoop() return pconn, nil }新しいコネクションが確立される際に、
writechが初期化され、writeLoopゴルーチンが起動されます。 -
readLoopでwritechをクローズ:--- a/src/pkg/net/http/transport.go +++ b/src/pkg/net/http/transport.go @@ -519,6 +522,7 @@ func remoteSideClosed(err error) bool { } func (pc *persistConn) readLoop() { + defer close(pc.writech) alive := true var lastbody io.ReadCloser // last response body, if any, read on this connectionreadLoopが終了する際にwritechをクローズすることで、writeLoopが適切に終了できるようにします。 -
writeLoopゴルーチンの追加:--- a/src/pkg/net/http/transport.go +++ b/src/pkg/net/http/transport.go @@ -615,6 +619,23 @@ func (pc *persistConn) readLoop() { } } +func (pc *persistConn) writeLoop() { + for wr := range pc.writech { // (1) writech から書き込み要求を読み取る + if pc.isBroken() { // (2) コネクションが壊れていないかチェック + wr.ch <- errors.New("http: can't write HTTP request on broken connection") + continue + } + err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra) // (3) リクエストをバッファに書き込む + if err == nil { + err = pc.bw.Flush() // (4) バッファをフラッシュし、物理コネクションに送信 + } + if err != nil { + pc.markBroken() // (5) エラーがあればコネクションを壊れた状態にする + } + wr.ch <- err // (6) 結果を roundTrip に返す + } +} + type responseAndError struct { res *Response err errorリクエストの書き込みとフラッシュを担当する新しいゴルーチン
writeLoopが追加されました。 -
writeRequest構造体の追加:--- a/src/pkg/net/http/transport.go +++ b/src/pkg/net/http/transport.go @@ -630,6 +651,15 @@ type requestAndChan struct { addedGzip bool } +// A writeRequest is sent by the readLoop's goroutine to the +// writeLoop's goroutine to write a request while the read loop +// concurrently waits on both the write response and the server's +// reply. +type writeRequest struct { + req *transportRequest + ch chan<- error +} + func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) { if pc.mutateHeaderFunc != nil { pc.mutateHeaderFunc(req.extraHeaders())writeLoopとroundTrip間で書き込み要求をやり取りするためのwriteRequest構造体が定義されました。 -
roundTripメソッドの並行処理ロジックの変更:--- a/src/pkg/net/http/transport.go +++ b/src/pkg/net/http/transport.go @@ -652,16 +682,29 @@ func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err err pc.numExpectedResponses++ pc.lk.Unlock() - err = req.Request.write(pc.bw, pc.isProxy, req.extra) - if err != nil { - pc.close() - return + // Write the request concurrently with waiting for a response, + // in case the server decides to reply before reading our full + // request body. + writeErrCh := make(chan error, 1) // (1) 書き込みエラーを受け取るチャネル + pc.writech <- writeRequest{req, writeErrCh} // (2) writeLoop に書き込み要求を送信 + + resc := make(chan responseAndError, 1) // (3) レスポンスを受け取るチャネル + pc.reqch <- requestAndChan{req.Request, resc, requestedGzip} // (4) readLoop にレスポンス待機要求を送信 + + var re responseAndError +WaitResponse: + for { + select { // (5) writeErrCh と resc の両方を同時に待機 + case err := <-writeErrCh: // (6) 書き込みエラーが発生した場合 + if err != nil { + re = responseAndError{nil, err} + break WaitResponse + } + case re = <-resc: // (7) レスポンスが到着した場合 + break WaitResponse + } } - pc.bw.Flush() - ch := make(chan responseAndError, 1) - pc.reqch <- requestAndChan{req.Request, ch, requestedGzip} - re := <-ch pc.lk.Lock() pc.numExpectedResponses-- pc.lk.Unlock()リクエストの書き込みを
writechに送信し、writeErrChとrescの両方をselectで同時に待機するようになりました。 -
markBrokenメソッドの追加:--- a/src/pkg/net/http/transport.go +++ b/src/pkg/net/http/transport.go @@ -669,6 +712,15 @@ func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err err return re.res, re.err } +// markBroken marks a connection as broken (so it's not reused). +// It differs from close in that it doesn't close the underlying +// connection for use when it's still being read. +func (pc *persistConn) markBroken() { + pc.lk.Lock() + defer pc.lk.Unlock() + pc.broken = true +} + func (pc *persistConn) close() { pc.lk.Lock() defer pc.lk.Unlock()コネクションを「壊れた」状態としてマークするためのヘルパーメソッドが追加されました。
src/pkg/net/http/transport_test.go
TestIssue3595テストケースの追加:
このテストケースは、サーバーがリクエストボディを完全に読み取る前にレスポンスを返した場合でも、クライアントがそのレスポンスを正しく受け取れることを検証します。特に、--- a/src/pkg/net/http/transport_test.go +++ b/src/pkg/net/http/transport_test.go @@ -833,6 +833,30 @@ func TestIssue3644(t *testing.T) { } } +// Test that a client receives a server's reply, even if the server doesn't read +// the entire request body. +func TestIssue3595(t *testing.T) { + const deniedMsg = "sorry, denied." + ts := httptest.NewServer(HandlerFunc(func(w ResponseWriter, r *Request) { + Error(w, deniedMsg, StatusUnauthorized) + })) + defer ts.Close() + tr := &Transport{} + c := &Client{Transport: tr} + res, err := c.Post(ts.URL, "application/octet-stream", neverEnding('a')) + if err != nil { + t.Errorf("Post: %v", err) + return + } + got, err := ioutil.ReadAll(res.Body) + if err != nil { + t.Fatalf("Body ReadAll: %v", err) + } + if !strings.Contains(string(got), deniedMsg) { + t.Errorf("Known bug: response %q does not contain %q", got, deniedMsg) + } +} + type fooProto struct{} func (fooProto) RoundTrip(req *Request) (*Response, error) {neverEnding('a')という無限のボディを送信し、サーバーが401 Unauthorizedを返すシナリオをシミュレートしています。
これらの変更は、net/http クライアントの堅牢性と信頼性を大幅に向上させるものです。
コアとなるコードの解説
このコミットのコアとなる変更は、net/http/transport.go 内の persistConn 構造体と、それに関連する roundTrip、readLoop、そして新しく追加された writeLoop ゴルーチンの協調動作にあります。
persistConn 構造体
writech chan writeRequest: この新しいチャネルは、roundTripメソッドからwriteLoopゴルーチンへのリクエスト書き込み要求を伝達するために使用されます。バッファリングされたチャネル(容量50)であるため、ある程度の要求をキューに入れることができます。
writeRequest 構造体
req *transportRequest: 実際に書き込むHTTPリクエストの情報を保持します。ch chan<- error: 書き込み操作の結果(エラーまたはnil)をroundTripメソッドにフィードバックするためのチャネルです。これにより、書き込みの成功/失敗をroundTripが知ることができます。
getConn 関数
persistConnのインスタンスが作成される際に、writechが初期化され、go pconn.writeLoop()が呼び出されます。これにより、コネクションごとに専用のwriteLoopゴルーチンが起動され、リクエストの書き込みを非同期で処理する準備が整います。
writeLoop ゴルーチン
このゴルーチンは、リクエストボディの実際の書き込みを担当します。
func (pc *persistConn) writeLoop() {
for wr := range pc.writech { // (1) writech から書き込み要求を読み取る
if pc.isBroken() { // (2) コネクションが壊れていないかチェック
wr.ch <- errors.New("http: can't write HTTP request on broken connection")
continue
}
err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra) // (3) リクエストをバッファに書き込む
if err == nil {
err = pc.bw.Flush() // (4) バッファをフラッシュし、物理コネクションに送信
}
if err != nil {
pc.markBroken() // (5) エラーがあればコネクションを壊れた状態にする
}
wr.ch <- err // (6) 結果を roundTrip に返す
}
}
for wr := range pc.writech:writechからwriteRequestを受け取るまでブロックします。チャネルがクローズされるとループを終了します。if pc.isBroken(): コネクションがすでに壊れている場合、書き込みは行わず、エラーを返します。wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra): HTTPリクエストのヘッダーとボディをbufio.Writer(pc.bw) に書き込みます。この時点ではまだ物理的なネットワークには送信されません。err = pc.bw.Flush():pc.bwにバッファリングされたデータを物理的なネットワークコネクションに送信します。ここで実際にデータがサーバーに送られます。if err != nil { pc.markBroken() }: 書き込みまたはフラッシュ中にエラーが発生した場合、そのコネクションを「壊れた」状態としてマークします。これにより、このコネクションが将来のHTTPリクエストに再利用されるのを防ぎます。wr.ch <- err: 書き込み操作の結果(エラーまたはnil)を、writeRequestに含まれるチャネルを通じてroundTripゴルーチンに返します。
roundTrip メソッド
このメソッドは、単一のHTTPリクエストを送信し、レスポンスを受け取るクライアント側の主要なロジックを含んでいます。
func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
// ... (前略) ...
// Write the request concurrently with waiting for a response,
// in case the server decides to reply before reading our full
// request body.
writeErrCh := make(chan error, 1) // (1) 書き込みエラーを受け取るチャネル
pc.writech <- writeRequest{req, writeErrCh} // (2) writeLoop に書き込み要求を送信
resc := make(chan responseAndError, 1) // (3) レスポンスを受け取るチャネル
pc.reqch <- requestAndChan{req.Request, resc, requestedGzip} // (4) readLoop にレスポンス待機要求を送信
var re responseAndError
WaitResponse:
for {
select { // (5) writeErrCh と resc の両方を同時に待機
case err := <-writeErrCh: // (6) 書き込みエラーが発生した場合
if err != nil {
re = responseAndError{nil, err}
break WaitResponse
}
case re = <-resc: // (7) レスポンスが到着した場合
break WaitResponse
}
}
// ... (後略) ...
return re.res, re.err
}
writeErrCh := make(chan error, 1):writeLoopから書き込みエラーを受け取るためのチャネルを作成します。バッファリングされているため、writeLoopはroundTripがチャネルを読み取るのを待たずにエラーを送信できます。pc.writech <- writeRequest{req, writeErrCh}:writeRequestを作成し、pc.writechに送信します。これにより、writeLoopゴルーチンがこのリクエストの書き込みを開始します。この操作は非同期で行われます。resc := make(chan responseAndError, 1):readLoopからサーバーレスポンスを受け取るためのチャネルを作成します。pc.reqch <- requestAndChan{req.Request, resc, requestedGzip}:readLoopゴルーチンに、このリクエストに対するレスポンスを待機するよう指示します。select { ... }: ここがこの変更の最も重要な部分です。roundTripは、writeErrChとrescの両方を同時に待機します。case err := <-writeErrCh:: もしwriteLoopがリクエストの書き込み中にエラー(例: "broken pipe")を検出してwriteErrChに送信した場合、このケースが実行されます。roundTripはそのエラーを即座に処理し、レスポンスの待機を中断してエラーを返します。case re = <-resc:: もしreadLoopがサーバーからのレスポンスを先に受け取ってrescに送信した場合、このケースが実行されます。roundTripはレスポンスを処理し、書き込みエラーの発生を待たずにレスポンスを返します。
markBroken メソッド
pc.broken = true: コネクションが再利用できない状態であることを示すフラグを設定します。- このメソッドは、基盤となるTCPコネクションをすぐに閉じません。これは、
readLoopがまだそのコネクションから残りのデータを読み取っている可能性があるためです。コネクションの実際のクローズは、readLoopが終了するか、または他のメカニズムによって行われます。
これらの変更により、net/http クライアントは、リクエストの書き込みとレスポンスの読み込みを真に並行して実行できるようになり、サーバーからの早期レスポンスを適切に処理し、より堅牢な通信を実現します。
関連リンク
- Go CL (Change List): https://golang.org/cl/6238043
- GitHub Issue #3595: https://github.com/golang/go/issues/3595
参考にした情報源リンク
- Go言語の公式ドキュメント:
net/httpパッケージ - Go言語の並行処理に関するドキュメント(Goroutines, Channels, Select)
- HTTP/1.1 プロトコル仕様 (RFC 2616 または RFC 7230-7235)
- TCP/IP プロトコルに関する一般的な知識
bufioパッケージのドキュメントioパッケージのドキュメント- Go言語のテストフレームワーク
testingパッケージのドキュメント httptestパッケージのドキュメント- Web検索結果: "Go net/http Issue 3595" (このコミットが修正した問題に関連する議論や、
rstAvoidanceDelayなどの関連概念について言及されている可能性があります。)