[インデックス 13364] ファイルの概要
このコミットは、Go言語の標準ライブラリ net/http
パッケージにおけるクライアントの挙動を改善するものです。具体的には、HTTPリクエストのボディを送信している最中にサーバーからレスポンスが返された場合(例えば、認証エラーなど)、以前はクライアントがそのレスポンスを適切に処理できず、書き込みエラー("broken pipe"など)として扱ってしまう問題がありました。このコミットにより、リクエストの書き込みとレスポンスの読み込みが並行して行われるようになり、このようなシナリオでもサーバーからのレスポンスを正確に受け取れるようになりました。
コミット
commit a5aa91b9a2a013be9ced9ae05474e1dce2fe16d4
Author: Brad Fitzpatrick <bradfitz@golang.org>
Date: Tue Jun 19 09:20:41 2012 -0700
net/http: make client await response concurrently with writing request
If the server replies with an HTTP response before we're done
writing our body (for instance "401 Unauthorized" response), we
were previously ignoring that, since we returned our write
error ("broken pipe", etc) before ever reading the response.
Now we read and write at the same time.
Fixes #3595
R=rsc, adg
CC=golang-dev
https://golang.org/cl/6238043
GitHub上でのコミットページへのリンク
https://github.com/golang/go/commit/a5aa91b9a2a013be9ced9ae05474e1dce2fe16d4
元コミット内容
net/http
: クライアントがリクエストの書き込みと並行してレスポンスを待機するようにする
もしサーバーがリクエストボディの書き込みが完了する前にHTTPレスポンスを返した場合(例えば「401 Unauthorized」レスポンスなど)、以前はクライアントがそれを無視していました。なぜなら、レスポンスを読み込む前に書き込みエラー(「broken pipe」など)を返していたからです。 これで、読み込みと書き込みが同時に行われるようになります。
Fixes #3595
R=rsc, adg CC=golang-dev https://golang.org/cl/6238043
変更の背景
この変更の背景には、HTTPクライアントがリクエストボディをサーバーに送信している途中で、サーバーが早期にレスポンスを返すという特定のシナリオにおける問題がありました。
従来の net/http
クライアントの実装では、HTTPリクエストの送信(特にボディの書き込み)とサーバーからのレスポンスの受信が、ある程度シーケンシャルに行われていました。つまり、クライアントはまずリクエスト全体を書き込もうとし、その書き込みが完了するか、あるいはエラーが発生するまで、サーバーからのレスポンスを待機しませんでした。
このシーケンシャルな挙動が問題となるのは、以下のようなケースです。
- 早期レスポンス: サーバーがリクエストボディの全てを受け取る前に、何らかの理由でレスポンスを返す場合。例えば、認証が必要なエンドポイントに対して認証情報なしで大きなボディをPOSTした場合、サーバーはボディの受信を待たずに「401 Unauthorized」のようなエラーレスポンスをすぐに返すことがあります。
- 「broken pipe」問題: 従来のクライアントは、リクエストボディの書き込み中にサーバーが接続を切断した場合(早期レスポンスを返した後など)、
write
システムコールが「broken pipe」エラーを返していました。クライアントはこの書き込みエラーを即座に処理し、レスポンスを読み込むことなくエラーを返していました。 - 真のレスポンスの喪失: 結果として、クライアントはサーバーが意図した「401 Unauthorized」のような意味のあるレスポンスを受け取ることができず、単なる書き込みエラーとして処理してしまい、問題の根本原因を特定するのが困難でした。
このコミットは、このような状況下でもクライアントがサーバーからの早期レスポンスを適切に受け取り、処理できるようにするために導入されました。これにより、クライアントはより堅牢になり、サーバーの挙動に柔軟に対応できるようになります。
前提知識の解説
このコミットを理解するためには、以下の技術的知識が役立ちます。
-
HTTPプロトコル:
- リクエスト/レスポンスモデル: クライアントがリクエストを送信し、サーバーがレスポンスを返すという基本的なHTTPの通信モデル。
- リクエストボディ: POSTやPUTなどのメソッドで、クライアントがサーバーに送信するデータ本体。
- レスポンスステータスコード: サーバーがリクエストの結果を示す3桁の数字(例: 200 OK, 401 Unauthorized, 500 Internal Server Error)。
- TCPソケット: HTTP通信の基盤となるトランスポート層のプロトコル。データはストリームとして送受信されます。
-
Go言語の並行処理:
- Goroutine (ゴルーチン): Go言語における軽量なスレッドのようなもの。非常に低コストで多数生成でき、並行処理を実現します。
- Channel (チャネル): Goroutine間で安全にデータを送受信するための通信メカニズム。チャネルを介した通信は同期的に行われるため、データ競合を防ぎます。
select
ステートメント: 複数のチャネル操作を同時に待機し、準備ができたチャネル操作を実行するためのGoの構文。非同期処理の調整に不可欠です。
-
net/http
パッケージ:Transport
:net/http
パッケージにおいて、実際のHTTPリクエストの送信とレスポンスの受信を担当するインターフェース。コネクションの管理、リクエストの多重化、プロキシの処理などを行います。persistConn
:Transport
の内部で、単一のTCPコネクションを介して複数のHTTPリクエスト/レスポンスを処理するための構造体。HTTP/1.1のKeep-Alive機能を実現するために使用されます。readLoop
:persistConn
内で、サーバーからのレスポンスを継続的に読み込むためのGoroutine。roundTrip
:Transport
インターフェースの主要なメソッドで、単一のHTTPリクエストを送信し、対応するレスポンスを受け取る一連の処理をカプセル化します。
-
I/O操作とバッファリング:
io.Reader
/io.Writer
: Go言語における基本的なI/Oインターフェース。データの読み書き操作を抽象化します。bufio.Reader
/bufio.Writer
: バッファリングされたI/Oを提供する構造体。これにより、小さなI/O操作が多数発生するのを防ぎ、効率を向上させます。Flush()
メソッドはバッファの内容を実際に書き込み先に送るために重要です。- 「broken pipe」エラー: パイプやソケットの書き込み側が、読み込み側がすでにクローズされている状態で書き込みを試みた場合に発生するエラー。TCPソケット通信では、サーバーが接続を閉じた後にクライアントが書き込みを続けると発生します。
これらの概念を理解することで、コミットがどのようにして既存の課題を解決し、net/http
クライアントの堅牢性を向上させたのかを深く把握できます。
技術的詳細
このコミットの核心は、HTTPクライアントがリクエストの書き込みとレスポンスの読み込みを並行して行うための新しいメカニズムを導入した点にあります。これにより、サーバーがリクエストボディの受信完了を待たずに早期にレスポンスを返した場合でも、クライアントがそのレスポンスを適切に処理できるようになります。
具体的な技術的変更点は以下の通りです。
-
writech
チャネルの導入:persistConn
構造体にwritech chan writeRequest
という新しいチャネルが追加されました。このチャネルは、roundTrip
ゴルーチンからリクエストの書き込み要求を受け取り、writeLoop
ゴルーチンに渡す役割を担います。writeRequest
は、書き込むリクエスト (*transportRequest
) と、書き込み結果を返すためのチャネル (chan<- error
) を含む新しい構造体です。
-
writeLoop
ゴルーチンの導入:persistConn
の初期化時(getConn
メソッド内)に、go pconn.writeLoop()
という新しいゴルーチンが起動されるようになりました。- この
writeLoop
ゴルーチンは、pc.writech
からwriteRequest
を継続的に読み取ります。 writeRequest
を受け取ると、その中のリクエスト (wr.req.Request
) をpc.bw
(バッファリングされたライター) を使って実際にコネクションに書き込みます。- 書き込みが完了したら、
pc.bw.Flush()
を呼び出してバッファの内容を物理的なコネクションに送信します。 - 書き込み中にエラーが発生した場合(例: サーバーが接続を閉じたことによる「broken pipe」)、
pc.markBroken()
を呼び出してコネクションを「壊れた」状態としてマークし、そのエラーをwriteRequest
に含まれるチャネル (wr.ch
) を通じてroundTrip
ゴルーチンに返します。
-
roundTrip
メソッドの変更:- 以前は
roundTrip
メソッド内でリクエストの書き込みとフラッシュを直接行い、その後レスポンスを待機していました。 - 変更後、リクエストの書き込みは
writeLoop
ゴルーチンに委譲されるようになりました。具体的には、writeRequest
を作成し、pc.writech
に送信します。 - 最も重要な変更は、
select
ステートメントの導入です。roundTrip
は、writeErrCh
(書き込みエラーを受け取るチャネル) とresc
(レスポンスを受け取るチャネル) の両方を同時に待機するようになりました。 - これにより、リクエストの書き込みが完了する前にサーバーからレスポンスが返された場合でも、
resc
チャネルからのイベントが先に発生し、クライアントは早期レスポンスを即座に処理できるようになります。 - もし書き込みエラーが先に発生した場合、そのエラーが優先され、レスポンスの待機は中断されます。
- 以前は
-
markBroken
メソッドの導入:persistConn
にmarkBroken()
という新しいメソッドが追加されました。これは、コネクションが再利用できない状態になったことをマークしますが、基盤となるコネクション自体はすぐに閉じません。これは、readLoop
がまだそのコネクションからデータを読み取っている可能性があるためです。
これらの変更により、net/http
クライアントは、リクエストの書き込みとレスポンスの読み込みという2つの独立したタスクを並行して実行できるようになり、サーバーからの早期レスポンスを適切に処理し、より堅牢な通信を実現します。
コアとなるコードの変更箇所
このコミットによる主要なコード変更は、src/pkg/net/http/transport.go
と src/pkg/net/http/transport_test.go
の2つのファイルに集中しています。
src/pkg/net/http/transport.go
-
persistConn
構造体へのwritech
の追加:--- a/src/pkg/net/http/transport.go +++ b/src/pkg/net/http/transport.go @@ -487,7 +489,8 @@ type persistConn struct { closed bool // whether conn has been closed br *bufio.Reader // from conn bw *bufio.Writer // to conn - reqch chan requestAndChan // written by roundTrip(); read by readLoop() + reqch chan requestAndChan // written by roundTrip; read by readLoop + writech chan writeRequest // written by roundTrip; read by writeLoop isProxy bool
persistConn
にwritech
チャネルが追加され、リクエストの書き込み要求をwriteLoop
に送るために使用されます。 -
getConn
でwritech
の初期化とwriteLoop
の起動:--- a/src/pkg/net/http/transport.go +++ b/src/pkg/net/http/transport.go @@ -323,6 +323,7 @@ func (t *Transport) getConn(cm *connectMethod) (*persistConn, error) { cacheKey: cm.String(), conn: conn, reqch: make(chan requestAndChan, 50),\n + writech: make(chan writeRequest, 50), } switch { @@ -380,6 +381,7 @@ func (t *Transport) getConn(cm *connectMethod) (*persistConn, error) { pconn.br = bufio.NewReader(pconn.conn) pconn.bw = bufio.NewWriter(pconn.conn) go pconn.readLoop() + go pconn.writeLoop() return pconn, nil }
新しいコネクションが確立される際に、
writech
が初期化され、writeLoop
ゴルーチンが起動されます。 -
readLoop
でwritech
をクローズ:--- a/src/pkg/net/http/transport.go +++ b/src/pkg/net/http/transport.go @@ -519,6 +522,7 @@ func remoteSideClosed(err error) bool { } func (pc *persistConn) readLoop() { + defer close(pc.writech) alive := true var lastbody io.ReadCloser // last response body, if any, read on this connection
readLoop
が終了する際にwritech
をクローズすることで、writeLoop
が適切に終了できるようにします。 -
writeLoop
ゴルーチンの追加:--- a/src/pkg/net/http/transport.go +++ b/src/pkg/net/http/transport.go @@ -615,6 +619,23 @@ func (pc *persistConn) readLoop() { } } +func (pc *persistConn) writeLoop() { + for wr := range pc.writech { + if pc.isBroken() { + wr.ch <- errors.New("http: can't write HTTP request on broken connection") + continue + } + err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra) + if err == nil { + err = pc.bw.Flush() + } + if err != nil { + pc.markBroken() + } + wr.ch <- err + } +} + type responseAndError struct { res *Response err error
リクエストの書き込みとフラッシュを担当する新しいゴルーチン
writeLoop
が追加されました。 -
writeRequest
構造体の追加:--- a/src/pkg/net/http/transport.go +++ b/src/pkg/net/http/transport.go @@ -630,6 +651,15 @@ type requestAndChan struct { addedGzip bool } +// A writeRequest is sent by the readLoop's goroutine to the +// writeLoop's goroutine to write a request while the read loop +// concurrently waits on both the write response and the server's +// reply. +type writeRequest struct { + req *transportRequest + ch chan<- error +} + func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) { if pc.mutateHeaderFunc != nil { pc.mutateHeaderFunc(req.extraHeaders())
writeLoop
とroundTrip
間で書き込み要求をやり取りするためのwriteRequest
構造体が定義されました。 -
roundTrip
メソッドの並行処理ロジックの変更:--- a/src/pkg/net/http/transport.go +++ b/src/pkg/net/http/transport.go @@ -652,16 +682,29 @@ func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err err pc.numExpectedResponses++ pc.lk.Unlock() - err = req.Request.write(pc.bw, pc.isProxy, req.extra) - if err != nil { - pc.close() - return + // Write the request concurrently with waiting for a response, + // in case the server decides to reply before reading our full + // request body. + writeErrCh := make(chan error, 1) + pc.writech <- writeRequest{req, writeErrCh} + + resc := make(chan responseAndError, 1) + pc.reqch <- requestAndChan{req.Request, resc, requestedGzip} + + var re responseAndError +WaitResponse: + for { + select { + case err := <-writeErrCh: + if err != nil { + re = responseAndError{nil, err} + break WaitResponse + } + case re = <-resc: + break WaitResponse + } } - pc.bw.Flush() - ch := make(chan responseAndError, 1) - pc.reqch <- requestAndChan{req.Request, ch, requestedGzip} - re := <-ch pc.lk.Lock() pc.numExpectedResponses-- pc.lk.Unlock()
リクエストの書き込みを
writech
に送信し、writeErrCh
とresc
の両方をselect
で同時に待機するようになりました。 -
markBroken
メソッドの追加:--- a/src/pkg/net/http/transport.go +++ b/src/pkg/net/http/transport.go @@ -669,6 +712,15 @@ func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err err return re.res, re.err } +// markBroken marks a connection as broken (so it's not reused). +// It differs from close in that it doesn't close the underlying +// connection for use when it's still being read. +func (pc *persistConn) markBroken() { + pc.lk.Lock() + defer pc.lk.Unlock() + pc.broken = true +} + func (pc *persistConn) close() { pc.lk.Lock() defer pc.lk.Unlock()
コネクションを「壊れた」状態としてマークするためのヘルパーメソッドが追加されました。
src/pkg/net/http/transport_test.go
TestIssue3595
テストケースの追加:
このテストケースは、サーバーがリクエストボディを完全に読み取る前にレスポンスを返した場合でも、クライアントがそのレスポンスを正しく受け取れることを検証します。特に、--- a/src/pkg/net/http/transport_test.go +++ b/src/pkg/net/http/transport_test.go @@ -833,6 +833,30 @@ func TestIssue3644(t *testing.T) { } } +// Test that a client receives a server's reply, even if the server doesn't read +// the entire request body. +func TestIssue3595(t *testing.T) { + const deniedMsg = "sorry, denied." + ts := httptest.NewServer(HandlerFunc(func(w ResponseWriter, r *Request) { + Error(w, deniedMsg, StatusUnauthorized) + })) + defer ts.Close() + tr := &Transport{} + c := &Client{Transport: tr} + res, err := c.Post(ts.URL, "application/octet-stream", neverEnding('a')) + if err != nil { + t.Errorf("Post: %v", err) + return + } + got, err := ioutil.ReadAll(res.Body) + if err != nil { + t.Fatalf("Body ReadAll: %v", err) + } + if !strings.Contains(string(got), deniedMsg) { + t.Errorf("Known bug: response %q does not contain %q", got, deniedMsg) + } +} + type fooProto struct{} func (fooProto) RoundTrip(req *Request) (*Response, error) {
neverEnding('a')
という無限のボディを送信し、サーバーが401 Unauthorized
を返すシナリオをシミュレートしています。
これらの変更は、net/http
クライアントの堅牢性と信頼性を大幅に向上させるものです。
コアとなるコードの解説
このコミットのコアとなる変更は、net/http/transport.go
内の persistConn
構造体と、それに関連する roundTrip
、readLoop
、そして新しく追加された writeLoop
ゴルーチンの協調動作にあります。
persistConn
構造体
writech chan writeRequest
: この新しいチャネルは、roundTrip
メソッドからwriteLoop
ゴルーチンへのリクエスト書き込み要求を伝達するために使用されます。バッファリングされたチャネル(容量50)であるため、ある程度の要求をキューに入れることができます。
writeRequest
構造体
req *transportRequest
: 実際に書き込むHTTPリクエストの情報を保持します。ch chan<- error
: 書き込み操作の結果(エラーまたはnil)をroundTrip
メソッドにフィードバックするためのチャネルです。これにより、書き込みの成功/失敗をroundTrip
が知ることができます。
getConn
関数
persistConn
のインスタンスが作成される際に、writech
が初期化され、go pconn.writeLoop()
が呼び出されます。これにより、コネクションごとに専用のwriteLoop
ゴルーチンが起動され、リクエストの書き込みを非同期で処理する準備が整います。
writeLoop
ゴルーチン
このゴルーチンは、リクエストボディの実際の書き込みを担当します。
func (pc *persistConn) writeLoop() {
for wr := range pc.writech { // (1) writech から書き込み要求を読み取る
if pc.isBroken() { // (2) コネクションが壊れていないかチェック
wr.ch <- errors.New("http: can't write HTTP request on broken connection")
continue
}
err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra) // (3) リクエストをバッファに書き込む
if err == nil {
err = pc.bw.Flush() // (4) バッファをフラッシュし、物理コネクションに送信
}
if err != nil {
pc.markBroken() // (5) エラーがあればコネクションを壊れた状態にする
}
wr.ch <- err // (6) 結果を roundTrip に返す
}
}
for wr := range pc.writech
:writech
からwriteRequest
を受け取るまでブロックします。チャネルがクローズされるとループを終了します。if pc.isBroken()
: コネクションがすでに壊れている場合、書き込みは行わず、エラーを返します。wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra)
: HTTPリクエストのヘッダーとボディをbufio.Writer
(pc.bw
) に書き込みます。この時点ではまだ物理的なネットワークには送信されません。err = pc.bw.Flush()
:pc.bw
にバッファリングされたデータを物理的なネットワークコネクションに送信します。ここで実際にデータがサーバーに送られます。if err != nil { pc.markBroken() }
: 書き込みまたはフラッシュ中にエラーが発生した場合、そのコネクションを「壊れた」状態としてマークします。これにより、このコネクションが将来のHTTPリクエストに再利用されるのを防ぎます。wr.ch <- err
: 書き込み操作の結果(エラーまたはnil)を、writeRequest
に含まれるチャネルを通じてroundTrip
ゴルーチンに返します。
roundTrip
メソッド
このメソッドは、単一のHTTPリクエストを送信し、レスポンスを受け取るクライアント側の主要なロジックを含んでいます。
func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
// ... (前略) ...
// Write the request concurrently with waiting for a response,
// in case the server decides to reply before reading our full
// request body.
writeErrCh := make(chan error, 1) // (1) 書き込みエラーを受け取るチャネル
pc.writech <- writeRequest{req, writeErrCh} // (2) writeLoop に書き込み要求を送信
resc := make(chan responseAndError, 1) // (3) レスポンスを受け取るチャネル
pc.reqch <- requestAndChan{req.Request, resc, requestedGzip} // (4) readLoop にレスポンス待機要求を送信
var re responseAndError
WaitResponse:
for {
select { // (5) writeErrCh と resc の両方を同時に待機
case err := <-writeErrCh: // (6) 書き込みエラーが発生した場合
if err != nil {
re = responseAndError{nil, err}
break WaitResponse
}
case re = <-resc: // (7) レスポンスが到着した場合
break WaitResponse
}
}
// ... (後略) ...
return re.res, re.err
}
writeErrCh := make(chan error, 1)
:writeLoop
から書き込みエラーを受け取るためのチャネルを作成します。バッファリングされているため、writeLoop
はroundTrip
がチャネルを読み取るのを待たずにエラーを送信できます。pc.writech <- writeRequest{req, writeErrCh}
:writeRequest
を作成し、pc.writech
に送信します。これにより、writeLoop
ゴルーチンがこのリクエストの書き込みを開始します。この操作は非同期で行われます。resc := make(chan responseAndError, 1)
:readLoop
からサーバーレスポンスを受け取るためのチャネルを作成します。pc.reqch <- requestAndChan{req.Request, resc, requestedGzip}
:readLoop
ゴルーチンに、このリクエストに対するレスポンスを待機するよう指示します。select { ... }
: ここがこの変更の最も重要な部分です。roundTrip
は、writeErrCh
とresc
の両方からのイベントを同時に待機します。case err := <-writeErrCh:
: もしwriteLoop
がリクエストの書き込み中にエラー(例: "broken pipe")を検出してwriteErrCh
に送信した場合、このケースが実行されます。roundTrip
はそのエラーを即座に処理し、レスポンスの待機を中断してエラーを返します。case re = <-resc:
: もしreadLoop
がサーバーからのレスポンスを先に受け取ってresc
に送信した場合、このケースが実行されます。roundTrip
はレスポンスを処理し、書き込みエラーの発生を待たずにレスポンスを返します。
markBroken
メソッド
pc.broken = true
: コネクションが再利用できない状態であることを示すフラグを設定します。- このメソッドは、基盤となるTCPコネクションをすぐに閉じません。これは、
readLoop
がまだそのコネクションから残りのデータを読み取っている可能性があるためです。コネクションの実際のクローズは、readLoop
が終了するか、または他のメカニズムによって行われます。
これらの変更により、net/http
クライアントは、リクエストの書き込みとレスポンスの読み込みを真に並行して実行できるようになり、サーバーからの早期レスポンスを適切に処理し、より堅牢な通信を実現します。
関連リンク
- Go CL (Change List): https://golang.org/cl/6238043
- GitHub Issue #3595: https://github.com/golang/go/issues/3595
参考にした情報源リンク
- Go言語の公式ドキュメント:
net/http
パッケージ - Go言語の並行処理に関するドキュメント(Goroutines, Channels, Select)
- HTTP/1.1 プロトコル仕様 (RFC 2616 または RFC 7230-7235)
- TCP/IP プロトコルに関する一般的な知識
bufio
パッケージのドキュメントio
パッケージのドキュメント- Go言語のテストフレームワーク
testing
パッケージのドキュメント httptest
パッケージのドキュメント
[インデックス 13364] ファイルの概要
このコミットは、Go言語の標準ライブラリ net/http
パッケージにおけるクライアントの挙動を改善するものです。具体的には、HTTPリクエストのボディを送信している最中にサーバーからレスポンスが返された場合(例えば、認証エラーなど)、以前はクライアントがそのレスポンスを適切に処理できず、書き込みエラー("broken pipe"など)として扱ってしまう問題がありました。このコミットにより、リクエストの書き込みとレスポンスの読み込みが並行して行われるようになり、このようなシナリオでもサーバーからのレスポンスを正確に受け取れるようになりました。
コミット
commit a5aa91b9a2a013be9ced9ae05474e1dce2fe16d4
Author: Brad Fitzpatrick <bradfitz@golang.org>
Date: Tue Jun 19 09:20:41 2012 -0700
net/http: make client await response concurrently with writing request
If the server replies with an HTTP response before we're done
writing our body (for instance "401 Unauthorized" response), we
were previously ignoring that, since we returned our write
error ("broken pipe", etc) before ever reading the response.
Now we read and write at the same time.
Fixes #3595
R=rsc, adg
CC=golang-dev
https://golang.org/cl/6238043
GitHub上でのコミットページへのリンク
https://github.com/golang/go/commit/a5aa91b9a2a013be9ced9ae05474e1dce2fe16d4
元コミット内容
net/http
: クライアントがリクエストの書き込みと並行してレスポンスを待機するようにする
もしサーバーがリクエストボディの書き込みが完了する前にHTTPレスポンスを返した場合(例えば「401 Unauthorized」レスポンスなど)、以前はクライアントがそれを無視していました。なぜなら、レスポンスを読み込む前に書き込みエラー(「broken pipe」など)を返していたからです。 これで、読み込みと書き込みが同時に行われるようになります。
Fixes #3595
R=rsc, adg CC=golang-dev https://golang.org/cl/6238043
変更の背景
この変更の背景には、HTTPクライアントがリクエストボディをサーバーに送信している途中で、サーバーが早期にレスポンスを返すという特定のシナリオにおける問題がありました。
従来の net/http
クライアントの実装では、HTTPリクエストの送信(特にボディの書き込み)とサーバーからのレスポンスの受信が、ある程度シーケンシャルに行われていました。つまり、クライアントはまずリクエスト全体を書き込もうとし、その書き込みが完了するか、あるいはエラーが発生するまで、サーバーからのレスポンスを待機しませんでした。
このシーケンシャルな挙動が問題となるのは、以下のようなケースです。
- 早期レスポンス: サーバーがリクエストボディの全てを受け取る前に、何らかの理由でレスポンスを返す場合。例えば、認証が必要なエンドポイントに対して認証情報なしで大きなボディをPOSTした場合、サーバーはボディの受信を待たずに「401 Unauthorized」のようなエラーレスポンスをすぐに返すことがあります。
- 「broken pipe」問題: 従来のクライアントは、リクエストボディの書き込み中にサーバーが接続を切断した場合(早期レスポンスを返した後など)、
write
システムコールが「broken pipe」エラーを返していました。クライアントはこの書き込みエラーを即座に処理し、レスポンスを読み込むことなくエラーを返していました。 - 真のレスポンスの喪失: 結果として、クライアントはサーバーが意図した「401 Unauthorized」のような意味のあるレスポンスを受け取ることができず、単なる書き込みエラーとして処理してしまい、問題の根本原因を特定するのが困難でした。
このコミットは、このような状況下でもクライアントがサーバーからの早期レスポンスを適切に受け取り、処理できるようにするために導入されました。これにより、クライアントはより堅牢になり、サーバーの挙動に柔軟に対応できるようになります。
前提知識の解説
このコミットを理解するためには、以下の技術的知識が役立ちます。
-
HTTPプロトコル:
- リクエスト/レスポンスモデル: クライアントがリクエストを送信し、サーバーがレスポンスを返すという基本的なHTTPの通信モデル。
- リクエストボディ: POSTやPUTなどのメソッドで、クライアントがサーバーに送信するデータ本体。
- レスポンスステータスコード: サーバーがリクエストの結果を示す3桁の数字(例: 200 OK, 401 Unauthorized, 500 Internal Server Error)。
- TCPソケット: HTTP通信の基盤となるトランスポート層のプロトコル。データはストリームとして送受信されます。
-
Go言語の並行処理:
- Goroutine (ゴルーチン): Go言語における軽量なスレッドのようなもの。非常に低コストで多数生成でき、並行処理を実現します。
- Channel (チャネル): Goroutine間で安全にデータを送受信するための通信メカニズム。チャネルを介した通信は同期的に行われるため、データ競合を防ぎます。
select
ステートメント: 複数のチャネル操作を同時に待機し、準備ができたチャネル操作を実行するためのGoの構文。非同期処理の調整に不可欠です。
-
net/http
パッケージ:Transport
:net/http
パッケージにおいて、実際のHTTPリクエストの送信とレスポンスの受信を担当するインターフェース。コネクションの管理、リクエストの多重化、プロキシの処理などを行います。persistConn
:Transport
の内部で、単一のTCPコネクションを介して複数のHTTPリクエスト/レスポンスを処理するための構造体。HTTP/1.1のKeep-Alive機能を実現するために使用されます。readLoop
:persistConn
内で、サーバーからのレスポンスを継続的に読み込むためのGoroutine。roundTrip
:Transport
インターフェースの主要なメソッドで、単一のHTTPリクエストを送信し、対応するレスポンスを受け取る一連の処理をカプセル化します。
-
I/O操作とバッファリング:
io.Reader
/io.Writer
: Go言語における基本的なI/Oインターフェース。データの読み書き操作を抽象化します。bufio.Reader
/bufio.Writer
: バッファリングされたI/Oを提供する構造体。これにより、小さなI/O操作が多数発生するのを防ぎ、効率を向上させます。Flush()
メソッドはバッファの内容を実際に書き込み先に送るために重要です。- 「broken pipe」エラー: パイプやソケットの書き込み側が、読み込み側がすでにクローズされている状態で書き込みを試みた場合に発生するエラー。TCPソケット通信では、サーバーが接続を閉じた後にクライアントが書き込みを続けると発生します。
これらの概念を理解することで、コミットがどのようにして既存の課題を解決し、net/http
クライアントの堅牢性を向上させたのかを深く把握できます。
技術的詳細
このコミットの核心は、HTTPクライアントがリクエストの書き込みとレスポンスの読み込みを並行して行うための新しいメカニズムを導入した点にあります。これにより、サーバーがリクエストボディの受信完了を待たずに早期にレスポンスを返した場合でも、クライアントがそのレスポンスを適切に処理できるようになります。
具体的な技術的変更点は以下の通りです。
-
writech
チャネルの導入:persistConn
構造体にwritech chan writeRequest
という新しいチャネルが追加されました。このチャネルは、roundTrip
ゴルーチンからリクエストの書き込み要求を受け取り、writeLoop
ゴルーチンに渡す役割を担います。writeRequest
は、書き込むリクエスト (*transportRequest
) と、書き込み結果を返すためのチャネル (chan<- error
) を含む新しい構造体です。
-
writeLoop
ゴルーチンの導入:persistConn
の初期化時(getConn
メソッド内)に、go pconn.writeLoop()
という新しいゴルーチンが起動されるようになりました。- この
writeLoop
ゴルーチンは、pc.writech
からwriteRequest
を継続的に読み取ります。 writeRequest
を受け取ると、その中のリクエスト (wr.req.Request
) をpc.bw
(バッファリングされたライター) を使って実際にコネクションに書き込みます。- 書き込みが完了したら、
pc.bw.Flush()
を呼び出してバッファの内容を物理的なコネクションに送信します。 - 書き込み中にエラーが発生した場合(例: サーバーが接続を閉じたことによる「broken pipe」)、
pc.markBroken()
を呼び出してコネクションを「壊れた」状態としてマークし、そのエラーをwriteRequest
に含まれるチャネル (wr.ch
) を通じてroundTrip
ゴルーチンに返します。
-
roundTrip
メソッドの変更:- 以前は
roundTrip
メソッド内でリクエストの書き込みとフラッシュを直接行い、その後レスポンスを待機していました。 - 変更後、リクエストの書き込みは
writeLoop
ゴルーチンに委譲されるようになりました。具体的には、writeRequest
を作成し、pc.writech
に送信します。 - 最も重要な変更は、
select
ステートメントの導入です。roundTrip
は、writeErrCh
(書き込みエラーを受け取るチャネル) とresc
(レスポンスを受け取るチャネル) の両方を同時に待機するようになりました。 - これにより、リクエストの書き込みが完了する前にサーバーからレスポンスが返された場合でも、
resc
チャネルからのイベントが先に発生し、クライアントは早期レスポンスを即座に処理できるようになります。 - もし書き込みエラーが先に発生した場合、そのエラーが優先され、レスポンスの待機は中断されます。
- 以前は
-
markBroken
メソッドの導入:persistConn
にmarkBroken()
という新しいメソッドが追加されました。これは、コネクションが再利用できない状態になったことをマークしますが、基盤となるコネクション自体はすぐに閉じません。これは、readLoop
がまだそのコネクションからデータを読み取っている可能性があるためです。
これらの変更により、net/http
クライアントは、リクエストの書き込みとレスポンスの読み込みという2つの独立したタスクを並行して実行できるようになり、サーバーからの早期レスポンスを適切に処理し、より堅牢な通信を実現します。
コアとなるコードの変更箇所
このコミットによる主要なコード変更は、src/pkg/net/http/transport.go
と src/pkg/net/http/transport_test.go
の2つのファイルに集中しています。
src/pkg/net/http/transport.go
-
persistConn
構造体へのwritech
の追加:--- a/src/pkg/net/http/transport.go +++ b/src/pkg/net/http/transport.go @@ -487,7 +489,8 @@ type persistConn struct { closed bool // whether conn has been closed br *bufio.Reader // from conn bw *bufio.Writer // to conn - reqch chan requestAndChan // written by roundTrip(); read by readLoop() + reqch chan requestAndChan // written by roundTrip; read by readLoop + writech chan writeRequest // written by roundTrip; read by writeLoop isProxy bool
persistConn
にwritech
チャネルが追加され、リクエストの書き込み要求をwriteLoop
に送るために使用されます。 -
getConn
でwritech
の初期化とwriteLoop
の起動:--- a/src/pkg/net/http/transport.go +++ b/src/pkg/net/http/transport.go @@ -323,6 +323,7 @@ func (t *Transport) getConn(cm *connectMethod) (*persistConn, error) { cacheKey: cm.String(), conn: conn, reqch: make(chan requestAndChan, 50),\n + writech: make(chan writeRequest, 50), } switch { @@ -380,6 +381,7 @@ func (t *Transport) getConn(cm *connectMethod) (*persistConn, error) { pconn.br = bufio.NewReader(pconn.conn) pconn.bw = bufio.NewWriter(pconn.conn) go pconn.readLoop() + go pconn.writeLoop() return pconn, nil }
新しいコネクションが確立される際に、
writech
が初期化され、writeLoop
ゴルーチンが起動されます。 -
readLoop
でwritech
をクローズ:--- a/src/pkg/net/http/transport.go +++ b/src/pkg/net/http/transport.go @@ -519,6 +522,7 @@ func remoteSideClosed(err error) bool { } func (pc *persistConn) readLoop() { + defer close(pc.writech) alive := true var lastbody io.ReadCloser // last response body, if any, read on this connection
readLoop
が終了する際にwritech
をクローズすることで、writeLoop
が適切に終了できるようにします。 -
writeLoop
ゴルーチンの追加:--- a/src/pkg/net/http/transport.go +++ b/src/pkg/net/http/transport.go @@ -615,6 +619,23 @@ func (pc *persistConn) readLoop() { } } +func (pc *persistConn) writeLoop() { + for wr := range pc.writech { // (1) writech から書き込み要求を読み取る + if pc.isBroken() { // (2) コネクションが壊れていないかチェック + wr.ch <- errors.New("http: can't write HTTP request on broken connection") + continue + } + err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra) // (3) リクエストをバッファに書き込む + if err == nil { + err = pc.bw.Flush() // (4) バッファをフラッシュし、物理コネクションに送信 + } + if err != nil { + pc.markBroken() // (5) エラーがあればコネクションを壊れた状態にする + } + wr.ch <- err // (6) 結果を roundTrip に返す + } +} + type responseAndError struct { res *Response err error
リクエストの書き込みとフラッシュを担当する新しいゴルーチン
writeLoop
が追加されました。 -
writeRequest
構造体の追加:--- a/src/pkg/net/http/transport.go +++ b/src/pkg/net/http/transport.go @@ -630,6 +651,15 @@ type requestAndChan struct { addedGzip bool } +// A writeRequest is sent by the readLoop's goroutine to the +// writeLoop's goroutine to write a request while the read loop +// concurrently waits on both the write response and the server's +// reply. +type writeRequest struct { + req *transportRequest + ch chan<- error +} + func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) { if pc.mutateHeaderFunc != nil { pc.mutateHeaderFunc(req.extraHeaders())
writeLoop
とroundTrip
間で書き込み要求をやり取りするためのwriteRequest
構造体が定義されました。 -
roundTrip
メソッドの並行処理ロジックの変更:--- a/src/pkg/net/http/transport.go +++ b/src/pkg/net/http/transport.go @@ -652,16 +682,29 @@ func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err err pc.numExpectedResponses++ pc.lk.Unlock() - err = req.Request.write(pc.bw, pc.isProxy, req.extra) - if err != nil { - pc.close() - return + // Write the request concurrently with waiting for a response, + // in case the server decides to reply before reading our full + // request body. + writeErrCh := make(chan error, 1) // (1) 書き込みエラーを受け取るチャネル + pc.writech <- writeRequest{req, writeErrCh} // (2) writeLoop に書き込み要求を送信 + + resc := make(chan responseAndError, 1) // (3) レスポンスを受け取るチャネル + pc.reqch <- requestAndChan{req.Request, resc, requestedGzip} // (4) readLoop にレスポンス待機要求を送信 + + var re responseAndError +WaitResponse: + for { + select { // (5) writeErrCh と resc の両方を同時に待機 + case err := <-writeErrCh: // (6) 書き込みエラーが発生した場合 + if err != nil { + re = responseAndError{nil, err} + break WaitResponse + } + case re = <-resc: // (7) レスポンスが到着した場合 + break WaitResponse + } } - pc.bw.Flush() - ch := make(chan responseAndError, 1) - pc.reqch <- requestAndChan{req.Request, ch, requestedGzip} - re := <-ch pc.lk.Lock() pc.numExpectedResponses-- pc.lk.Unlock()
リクエストの書き込みを
writech
に送信し、writeErrCh
とresc
の両方をselect
で同時に待機するようになりました。 -
markBroken
メソッドの追加:--- a/src/pkg/net/http/transport.go +++ b/src/pkg/net/http/transport.go @@ -669,6 +712,15 @@ func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err err return re.res, re.err } +// markBroken marks a connection as broken (so it's not reused). +// It differs from close in that it doesn't close the underlying +// connection for use when it's still being read. +func (pc *persistConn) markBroken() { + pc.lk.Lock() + defer pc.lk.Unlock() + pc.broken = true +} + func (pc *persistConn) close() { pc.lk.Lock() defer pc.lk.Unlock()
コネクションを「壊れた」状態としてマークするためのヘルパーメソッドが追加されました。
src/pkg/net/http/transport_test.go
TestIssue3595
テストケースの追加:
このテストケースは、サーバーがリクエストボディを完全に読み取る前にレスポンスを返した場合でも、クライアントがそのレスポンスを正しく受け取れることを検証します。特に、--- a/src/pkg/net/http/transport_test.go +++ b/src/pkg/net/http/transport_test.go @@ -833,6 +833,30 @@ func TestIssue3644(t *testing.T) { } } +// Test that a client receives a server's reply, even if the server doesn't read +// the entire request body. +func TestIssue3595(t *testing.T) { + const deniedMsg = "sorry, denied." + ts := httptest.NewServer(HandlerFunc(func(w ResponseWriter, r *Request) { + Error(w, deniedMsg, StatusUnauthorized) + })) + defer ts.Close() + tr := &Transport{} + c := &Client{Transport: tr} + res, err := c.Post(ts.URL, "application/octet-stream", neverEnding('a')) + if err != nil { + t.Errorf("Post: %v", err) + return + } + got, err := ioutil.ReadAll(res.Body) + if err != nil { + t.Fatalf("Body ReadAll: %v", err) + } + if !strings.Contains(string(got), deniedMsg) { + t.Errorf("Known bug: response %q does not contain %q", got, deniedMsg) + } +} + type fooProto struct{} func (fooProto) RoundTrip(req *Request) (*Response, error) {
neverEnding('a')
という無限のボディを送信し、サーバーが401 Unauthorized
を返すシナリオをシミュレートしています。
これらの変更は、net/http
クライアントの堅牢性と信頼性を大幅に向上させるものです。
コアとなるコードの解説
このコミットのコアとなる変更は、net/http/transport.go
内の persistConn
構造体と、それに関連する roundTrip
、readLoop
、そして新しく追加された writeLoop
ゴルーチンの協調動作にあります。
persistConn
構造体
writech chan writeRequest
: この新しいチャネルは、roundTrip
メソッドからwriteLoop
ゴルーチンへのリクエスト書き込み要求を伝達するために使用されます。バッファリングされたチャネル(容量50)であるため、ある程度の要求をキューに入れることができます。
writeRequest
構造体
req *transportRequest
: 実際に書き込むHTTPリクエストの情報を保持します。ch chan<- error
: 書き込み操作の結果(エラーまたはnil)をroundTrip
メソッドにフィードバックするためのチャネルです。これにより、書き込みの成功/失敗をroundTrip
が知ることができます。
getConn
関数
persistConn
のインスタンスが作成される際に、writech
が初期化され、go pconn.writeLoop()
が呼び出されます。これにより、コネクションごとに専用のwriteLoop
ゴルーチンが起動され、リクエストの書き込みを非同期で処理する準備が整います。
writeLoop
ゴルーチン
このゴルーチンは、リクエストボディの実際の書き込みを担当します。
func (pc *persistConn) writeLoop() {
for wr := range pc.writech { // (1) writech から書き込み要求を読み取る
if pc.isBroken() { // (2) コネクションが壊れていないかチェック
wr.ch <- errors.New("http: can't write HTTP request on broken connection")
continue
}
err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra) // (3) リクエストをバッファに書き込む
if err == nil {
err = pc.bw.Flush() // (4) バッファをフラッシュし、物理コネクションに送信
}
if err != nil {
pc.markBroken() // (5) エラーがあればコネクションを壊れた状態にする
}
wr.ch <- err // (6) 結果を roundTrip に返す
}
}
for wr := range pc.writech
:writech
からwriteRequest
を受け取るまでブロックします。チャネルがクローズされるとループを終了します。if pc.isBroken()
: コネクションがすでに壊れている場合、書き込みは行わず、エラーを返します。wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra)
: HTTPリクエストのヘッダーとボディをbufio.Writer
(pc.bw
) に書き込みます。この時点ではまだ物理的なネットワークには送信されません。err = pc.bw.Flush()
:pc.bw
にバッファリングされたデータを物理的なネットワークコネクションに送信します。ここで実際にデータがサーバーに送られます。if err != nil { pc.markBroken() }
: 書き込みまたはフラッシュ中にエラーが発生した場合、そのコネクションを「壊れた」状態としてマークします。これにより、このコネクションが将来のHTTPリクエストに再利用されるのを防ぎます。wr.ch <- err
: 書き込み操作の結果(エラーまたはnil)を、writeRequest
に含まれるチャネルを通じてroundTrip
ゴルーチンに返します。
roundTrip
メソッド
このメソッドは、単一のHTTPリクエストを送信し、レスポンスを受け取るクライアント側の主要なロジックを含んでいます。
func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
// ... (前略) ...
// Write the request concurrently with waiting for a response,
// in case the server decides to reply before reading our full
// request body.
writeErrCh := make(chan error, 1) // (1) 書き込みエラーを受け取るチャネル
pc.writech <- writeRequest{req, writeErrCh} // (2) writeLoop に書き込み要求を送信
resc := make(chan responseAndError, 1) // (3) レスポンスを受け取るチャネル
pc.reqch <- requestAndChan{req.Request, resc, requestedGzip} // (4) readLoop にレスポンス待機要求を送信
var re responseAndError
WaitResponse:
for {
select { // (5) writeErrCh と resc の両方を同時に待機
case err := <-writeErrCh: // (6) 書き込みエラーが発生した場合
if err != nil {
re = responseAndError{nil, err}
break WaitResponse
}
case re = <-resc: // (7) レスポンスが到着した場合
break WaitResponse
}
}
// ... (後略) ...
return re.res, re.err
}
writeErrCh := make(chan error, 1)
:writeLoop
から書き込みエラーを受け取るためのチャネルを作成します。バッファリングされているため、writeLoop
はroundTrip
がチャネルを読み取るのを待たずにエラーを送信できます。pc.writech <- writeRequest{req, writeErrCh}
:writeRequest
を作成し、pc.writech
に送信します。これにより、writeLoop
ゴルーチンがこのリクエストの書き込みを開始します。この操作は非同期で行われます。resc := make(chan responseAndError, 1)
:readLoop
からサーバーレスポンスを受け取るためのチャネルを作成します。pc.reqch <- requestAndChan{req.Request, resc, requestedGzip}
:readLoop
ゴルーチンに、このリクエストに対するレスポンスを待機するよう指示します。select { ... }
: ここがこの変更の最も重要な部分です。roundTrip
は、writeErrCh
とresc
の両方からのイベントを同時に待機します。case err := <-writeErrCh:
: もしwriteLoop
がリクエストの書き込み中にエラー(例: "broken pipe")を検出してwriteErrCh
に送信した場合、このケースが実行されます。roundTrip
はそのエラーを即座に処理し、レスポンスの待機を中断してエラーを返します。case re = <-resc:
: もしreadLoop
がサーバーからのレスポンスを先に受け取ってresc
に送信した場合、このケースが実行されます。roundTrip
はレスポンスを処理し、書き込みエラーの発生を待たずにレスポンスを返します。
markBroken
メソッド
pc.broken = true
: コネクションが再利用できない状態であることを示すフラグを設定します。- このメソッドは、基盤となるTCPコネクションをすぐに閉じません。これは、
readLoop
がまだそのコネクションから残りのデータを読み取っている可能性があるためです。コネクションの実際のクローズは、readLoop
が終了するか、または他のメカニズムによって行われます。
これらの変更により、net/http
クライアントは、リクエストの書き込みとレスポンスの読み込みを真に並行して実行できるようになり、サーバーからの早期レスポンスを適切に処理し、より堅牢な通信を実現します。
関連リンク
- Go CL (Change List): https://golang.org/cl/6238043
- GitHub Issue #3595: https://github.com/golang/go/issues/3595
参考にした情報源リンク
- Go言語の公式ドキュメント:
net/http
パッケージ - Go言語の並行処理に関するドキュメント(Goroutines, Channels, Select)
- HTTP/1.1 プロトコル仕様 (RFC 2616 または RFC 7230-7235)
- TCP/IP プロトコルに関する一般的な知識
bufio
パッケージのドキュメントio
パッケージのドキュメント- Go言語のテストフレームワーク
testing
パッケージのドキュメント httptest
パッケージのドキュメント- Web検索結果: "Go net/http Issue 3595" (このコミットが修正した問題に関連する議論や、
rstAvoidanceDelay
などの関連概念について言及されている可能性があります。)
[インデックス 13364] ファイルの概要
このコミットは、Go言語の標準ライブラリ net/http
パッケージにおけるクライアントの挙動を改善するものです。具体的には、HTTPリクエストのボディを送信している最中にサーバーからレスポンスが返された場合(例えば、認証エラーなど)、以前はクライアントがそのレスポンスを適切に処理できず、書き込みエラー("broken pipe"など)として扱ってしまう問題がありました。このコミットにより、リクエストの書き込みとレスポンスの読み込みが並行して行われるようになり、このようなシナリオでもサーバーからのレスポンスを正確に受け取れるようになりました。
コミット
commit a5aa91b9a2a013be9ced9ae05474e1dce2fe16d4
Author: Brad Fitzpatrick <bradfitz@golang.org>
Date: Tue Jun 19 09:20:41 2012 -0700
net/http: make client await response concurrently with writing request
If the server replies with an HTTP response before we're done
writing our body (for instance "401 Unauthorized" response), we
were previously ignoring that, since we returned our write
error ("broken pipe", etc) before ever reading the response.
Now we read and write at the same time.
Fixes #3595
R=rsc, adg
CC=golang-dev
https://golang.org/cl/6238043
GitHub上でのコミットページへのリンク
https://github.com/golang/go/commit/a5aa91b9a2a013be9ced9ae05474e1dce2fe16d4
元コミット内容
net/http
: クライアントがリクエストの書き込みと並行してレスポンスを待機するようにする
もしサーバーがリクエストボディの書き込みが完了する前にHTTPレスポンスを返した場合(例えば「401 Unauthorized」レスポンスなど)、以前はクライアントがそれを無視していました。なぜなら、レスポンスを読み込む前に書き込みエラー(「broken pipe」など)を返していたからです。 これで、読み込みと書き込みが同時に行われるようになります。
Fixes #3595
R=rsc, adg CC=golang-dev https://golang.org/cl/6238043
変更の背景
この変更の背景には、HTTPクライアントがリクエストボディをサーバーに送信している途中で、サーバーが早期にレスポンスを返すという特定のシナリオにおける問題がありました。
従来の net/http
クライアントの実装では、HTTPリクエストの送信(特にボディの書き込み)とサーバーからのレスポンスの受信が、ある程度シーケンシャルに行われていました。つまり、クライアントはまずリクエスト全体を書き込もうとし、その書き込みが完了するか、あるいはエラーが発生するまで、サーバーからのレスポンスを待機しませんでした。
このシーケンシャルな挙動が問題となるのは、以下のようなケースです。
- 早期レスポンス: サーバーがリクエストボディの全てを受け取る前に、何らかの理由でレスポンスを返す場合。例えば、認証が必要なエンドポイントに対して認証情報なしで大きなボディをPOSTした場合、サーバーはボディの受信を待たずに「401 Unauthorized」のようなエラーレスポンスをすぐに返すことがあります。
- 「broken pipe」問題: 従来のクライアントは、リクエストボディの書き込み中にサーバーが接続を切断した場合(早期レスポンスを返した後など)、
write
システムコールが「broken pipe」エラーを返していました。クライアントはこの書き込みエラーを即座に処理し、レスポンスを読み込むことなくエラーを返していました。 - 真のレスポンスの喪失: 結果として、クライアントはサーバーが意図した「401 Unauthorized」のような意味のあるレスポンスを受け取ることができず、単なる書き込みエラーとして処理してしまい、問題の根本原因を特定するのが困難でした。
このコミットは、このような状況下でもクライアントがサーバーからの早期レスポンスを適切に受け取り、処理できるようにするために導入されました。これにより、クライアントはより堅牢になり、サーバーの挙動に柔軟に対応できるようになります。
前提知識の解説
このコミットを理解するためには、以下の技術的知識が役立ちます。
-
HTTPプロトコル:
- リクエスト/レスポンスモデル: クライアントがリクエストを送信し、サーバーがレスポンスを返すという基本的なHTTPの通信モデル。
- リクエストボディ: POSTやPUTなどのメソッドで、クライアントがサーバーに送信するデータ本体。
- レスポンスステータスコード: サーバーがリクエストの結果を示す3桁の数字(例: 200 OK, 401 Unauthorized, 500 Internal Server Error)。
- TCPソケット: HTTP通信の基盤となるトランスポート層のプロトコル。データはストリームとして送受信されます。
-
Go言語の並行処理:
- Goroutine (ゴルーチン): Go言語における軽量なスレッドのようなもの。非常に低コストで多数生成でき、並行処理を実現します。
- Channel (チャネル): Goroutine間で安全にデータを送受信するための通信メカニズム。チャネルを介した通信は同期的に行われるため、データ競合を防ぎます。
select
ステートメント: 複数のチャネル操作を同時に待機し、準備ができたチャネル操作を実行するためのGoの構文。非同期処理の調整に不可欠です。
-
net/http
パッケージ:Transport
:net/http
パッケージにおいて、実際のHTTPリクエストの送信とレスポンスの受信を担当するインターフェース。コネクションの管理、リクエストの多重化、プロキシの処理などを行います。persistConn
:Transport
の内部で、単一のTCPコネクションを介して複数のHTTPリクエスト/レスポンスを処理するための構造体。HTTP/1.1のKeep-Alive機能を実現するために使用されます。readLoop
:persistConn
内で、サーバーからのレスポンスを継続的に読み込むためのGoroutine。roundTrip
:Transport
インターフェースの主要なメソッドで、単一のHTTPリクエストを送信し、対応するレスポンスを受け取る一連の処理をカプセル化します。
-
I/O操作とバッファリング:
io.Reader
/io.Writer
: Go言語における基本的なI/Oインターフェース。データの読み書き操作を抽象化します。bufio.Reader
/bufio.Writer
: バッファリングされたI/Oを提供する構造体。これにより、小さなI/O操作が多数発生するのを防ぎ、効率を向上させます。Flush()
メソッドはバッファの内容を実際に書き込み先に送るために重要です。- 「broken pipe」エラー: パイプやソケットの書き込み側が、読み込み側がすでにクローズされている状態で書き込みを試みた場合に発生するエラー。TCPソケット通信では、サーバーが接続を閉じた後にクライアントが書き込みを続けると発生します。
これらの概念を理解することで、コミットがどのようにして既存の課題を解決し、net/http
クライアントの堅牢性を向上させたのかを深く把握できます。
技術的詳細
このコミットの核心は、HTTPクライアントがリクエストの書き込みとレスポンスの読み込みを並行して行うための新しいメカニズムを導入した点にあります。これにより、サーバーがリクエストボディの受信完了を待たずに早期にレスポンスを返した場合でも、クライアントがそのレスポンスを適切に処理できるようになります。
具体的な技術的変更点は以下の通りです。
-
writech
チャネルの導入:persistConn
構造体にwritech chan writeRequest
という新しいチャネルが追加されました。このチャネルは、roundTrip
ゴルーチンからリクエストの書き込み要求を受け取り、writeLoop
ゴルーチンに渡す役割を担います。writeRequest
は、書き込むリクエスト (*transportRequest
) と、書き込み結果を返すためのチャネル (chan<- error
) を含む新しい構造体です。
-
writeLoop
ゴルーチンの導入:persistConn
の初期化時(getConn
メソッド内)に、go pconn.writeLoop()
という新しいゴルーチンが起動されるようになりました。- この
writeLoop
ゴルーチンは、pc.writech
からwriteRequest
を継続的に読み取ります。 writeRequest
を受け取ると、その中のリクエスト (wr.req.Request
) をpc.bw
(バッファリングされたライター) を使って実際にコネクションに書き込みます。- 書き込みが完了したら、
pc.bw.Flush()
を呼び出してバッファの内容を物理的なコネクションに送信します。 - 書き込み中にエラーが発生した場合(例: サーバーが接続を閉じたことによる「broken pipe」)、
pc.markBroken()
を呼び出してコネクションを「壊れた」状態としてマークし、そのエラーをwriteRequest
に含まれるチャネル (wr.ch
) を通じてroundTrip
ゴルーチンに返します。
-
roundTrip
メソッドの変更:- 以前は
roundTrip
メソッド内でリクエストの書き込みとフラッシュを直接行い、その後レスポンスを待機していました。 - 変更後、リクエストの書き込みは
writeLoop
ゴルーチンに委譲されるようになりました。具体的には、writeRequest
を作成し、pc.writech
に送信します。 - 最も重要な変更は、
select
ステートメントの導入です。roundTrip
は、writeErrCh
(書き込みエラーを受け取るチャネル) とresc
(レスポンスを受け取るチャネル) の両方を同時に待機するようになりました。 - これにより、リクエストの書き込みが完了する前にサーバーからレスポンスが返された場合でも、
resc
チャネルからのイベントが先に発生し、クライアントは早期レスポンスを即座に処理できるようになります。 - もし書き込みエラーが先に発生した場合、そのエラーが優先され、レスポンスの待機は中断されます。
- 以前は
-
markBroken
メソッドの導入:persistConn
にmarkBroken()
という新しいメソッドが追加されました。これは、コネクションが再利用できない状態になったことをマークしますが、基盤となるコネクション自体はすぐに閉じません。これは、readLoop
がまだそのコネクションからデータを読み取っている可能性があるためです。
これらの変更により、net/http
クライアントは、リクエストの書き込みとレスポンスの読み込みという2つの独立したタスクを並行して実行できるようになり、サーバーからの早期レスポンスを適切に処理し、より堅牢な通信を実現します。
コアとなるコードの変更箇所
このコミットによる主要なコード変更は、src/pkg/net/http/transport.go
と src/pkg/net/http/transport_test.go
の2つのファイルに集中しています。
src/pkg/net/http/transport.go
-
persistConn
構造体へのwritech
の追加:--- a/src/pkg/net/http/transport.go +++ b/src/pkg/net/http/transport.go @@ -487,7 +489,8 @@ type persistConn struct { closed bool // whether conn has been closed br *bufio.Reader // from conn bw *bufio.Writer // to conn - reqch chan requestAndChan // written by roundTrip(); read by readLoop() + reqch chan requestAndChan // written by roundTrip; read by readLoop + writech chan writeRequest // written by roundTrip; read by writeLoop isProxy bool
persistConn
にwritech
チャネルが追加され、リクエストの書き込み要求をwriteLoop
に送るために使用されます。 -
getConn
でwritech
の初期化とwriteLoop
の起動:--- a/src/pkg/net/http/transport.go +++ b/src/pkg/net/http/transport.go @@ -323,6 +323,7 @@ func (t *Transport) getConn(cm *connectMethod) (*persistConn, error) { cacheKey: cm.String(), conn: conn, reqch: make(chan requestAndChan, 50),\n + writech: make(chan writeRequest, 50), } switch { @@ -380,6 +381,7 @@ func (t *Transport) getConn(cm *connectMethod) (*persistConn, error) { pconn.br = bufio.NewReader(pconn.conn) pconn.bw = bufio.NewWriter(pconn.conn) go pconn.readLoop() + go pconn.writeLoop() return pconn, nil }
新しいコネクションが確立される際に、
writech
が初期化され、writeLoop
ゴルーチンが起動されます。 -
readLoop
でwritech
をクローズ:--- a/src/pkg/net/http/transport.go +++ b/src/pkg/net/http/transport.go @@ -519,6 +522,7 @@ func remoteSideClosed(err error) bool { } func (pc *persistConn) readLoop() { + defer close(pc.writech) alive := true var lastbody io.ReadCloser // last response body, if any, read on this connection
readLoop
が終了する際にwritech
をクローズすることで、writeLoop
が適切に終了できるようにします。 -
writeLoop
ゴルーチンの追加:--- a/src/pkg/net/http/transport.go +++ b/src/pkg/net/http/transport.go @@ -615,6 +619,23 @@ func (pc *persistConn) readLoop() { } } +func (pc *persistConn) writeLoop() { + for wr := range pc.writech { // (1) writech から書き込み要求を読み取る + if pc.isBroken() { // (2) コネクションが壊れていないかチェック + wr.ch <- errors.New("http: can't write HTTP request on broken connection") + continue + } + err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra) // (3) リクエストをバッファに書き込む + if err == nil { + err = pc.bw.Flush() // (4) バッファをフラッシュし、物理コネクションに送信 + } + if err != nil { + pc.markBroken() // (5) エラーがあればコネクションを壊れた状態にする + } + wr.ch <- err // (6) 結果を roundTrip に返す + } +} + type responseAndError struct { res *Response err error
リクエストの書き込みとフラッシュを担当する新しいゴルーチン
writeLoop
が追加されました。 -
writeRequest
構造体の追加:--- a/src/pkg/net/http/transport.go +++ b/src/pkg/net/http/transport.go @@ -630,6 +651,15 @@ type requestAndChan struct { addedGzip bool } +// A writeRequest is sent by the readLoop's goroutine to the +// writeLoop's goroutine to write a request while the read loop +// concurrently waits on both the write response and the server's +// reply. +type writeRequest struct { + req *transportRequest + ch chan<- error +} + func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) { if pc.mutateHeaderFunc != nil { pc.mutateHeaderFunc(req.extraHeaders())
writeLoop
とroundTrip
間で書き込み要求をやり取りするためのwriteRequest
構造体が定義されました。 -
roundTrip
メソッドの並行処理ロジックの変更:--- a/src/pkg/net/http/transport.go +++ b/src/pkg/net/http/transport.go @@ -652,16 +682,29 @@ func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err err pc.numExpectedResponses++ pc.lk.Unlock() - err = req.Request.write(pc.bw, pc.isProxy, req.extra) - if err != nil { - pc.close() - return + // Write the request concurrently with waiting for a response, + // in case the server decides to reply before reading our full + // request body. + writeErrCh := make(chan error, 1) // (1) 書き込みエラーを受け取るチャネル + pc.writech <- writeRequest{req, writeErrCh} // (2) writeLoop に書き込み要求を送信 + + resc := make(chan responseAndError, 1) // (3) レスポンスを受け取るチャネル + pc.reqch <- requestAndChan{req.Request, resc, requestedGzip} // (4) readLoop にレスポンス待機要求を送信 + + var re responseAndError +WaitResponse: + for { + select { // (5) writeErrCh と resc の両方を同時に待機 + case err := <-writeErrCh: // (6) 書き込みエラーが発生した場合 + if err != nil { + re = responseAndError{nil, err} + break WaitResponse + } + case re = <-resc: // (7) レスポンスが到着した場合 + break WaitResponse + } } - pc.bw.Flush() - ch := make(chan responseAndError, 1) - pc.reqch <- requestAndChan{req.Request, ch, requestedGzip} - re := <-ch pc.lk.Lock() pc.numExpectedResponses-- pc.lk.Unlock()
リクエストの書き込みを
writech
に送信し、writeErrCh
とresc
の両方をselect
で同時に待機するようになりました。 -
markBroken
メソッドの追加:--- a/src/pkg/net/http/transport.go +++ b/src/pkg/net/http/transport.go @@ -669,6 +712,15 @@ func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err err return re.res, re.err } +// markBroken marks a connection as broken (so it's not reused). +// It differs from close in that it doesn't close the underlying +// connection for use when it's still being read. +func (pc *persistConn) markBroken() { + pc.lk.Lock() + defer pc.lk.Unlock() + pc.broken = true +} + func (pc *persistConn) close() { pc.lk.Lock() defer pc.lk.Unlock()
コネクションを「壊れた」状態としてマークするためのヘルパーメソッドが追加されました。
src/pkg/net/http/transport_test.go
TestIssue3595
テストケースの追加:
このテストケースは、サーバーがリクエストボディを完全に読み取る前にレスポンスを返した場合でも、クライアントがそのレスポンスを正しく受け取れることを検証します。特に、--- a/src/pkg/net/http/transport_test.go +++ b/src/pkg/net/http/transport_test.go @@ -833,6 +833,30 @@ func TestIssue3644(t *testing.T) { } } +// Test that a client receives a server's reply, even if the server doesn't read +// the entire request body. +func TestIssue3595(t *testing.T) { + const deniedMsg = "sorry, denied." + ts := httptest.NewServer(HandlerFunc(func(w ResponseWriter, r *Request) { + Error(w, deniedMsg, StatusUnauthorized) + })) + defer ts.Close() + tr := &Transport{} + c := &Client{Transport: tr} + res, err := c.Post(ts.URL, "application/octet-stream", neverEnding('a')) + if err != nil { + t.Errorf("Post: %v", err) + return + } + got, err := ioutil.ReadAll(res.Body) + if err != nil { + t.Fatalf("Body ReadAll: %v", err) + } + if !strings.Contains(string(got), deniedMsg) { + t.Errorf("Known bug: response %q does not contain %q", got, deniedMsg) + } +} + type fooProto struct{} func (fooProto) RoundTrip(req *Request) (*Response, error) {
neverEnding('a')
という無限のボディを送信し、サーバーが401 Unauthorized
を返すシナリオをシミュレートしています。
これらの変更は、net/http
クライアントの堅牢性と信頼性を大幅に向上させるものです。
コアとなるコードの解説
このコミットのコアとなる変更は、net/http/transport.go
内の persistConn
構造体と、それに関連する roundTrip
、readLoop
、そして新しく追加された writeLoop
ゴルーチンの協調動作にあります。
persistConn
構造体
writech chan writeRequest
: この新しいチャネルは、roundTrip
メソッドからwriteLoop
ゴルーチンへのリクエスト書き込み要求を伝達するために使用されます。バッファリングされたチャネル(容量50)であるため、ある程度の要求をキューに入れることができます。
writeRequest
構造体
req *transportRequest
: 実際に書き込むHTTPリクエストの情報を保持します。ch chan<- error
: 書き込み操作の結果(エラーまたはnil)をroundTrip
メソッドにフィードバックするためのチャネルです。これにより、書き込みの成功/失敗をroundTrip
が知ることができます。
getConn
関数
persistConn
のインスタンスが作成される際に、writech
が初期化され、go pconn.writeLoop()
が呼び出されます。これにより、コネクションごとに専用のwriteLoop
ゴルーチンが起動され、リクエストの書き込みを非同期で処理する準備が整います。
writeLoop
ゴルーチン
このゴルーチンは、リクエストボディの実際の書き込みを担当します。
func (pc *persistConn) writeLoop() {
for wr := range pc.writech { // (1) writech から書き込み要求を読み取る
if pc.isBroken() { // (2) コネクションが壊れていないかチェック
wr.ch <- errors.New("http: can't write HTTP request on broken connection")
continue
}
err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra) // (3) リクエストをバッファに書き込む
if err == nil {
err = pc.bw.Flush() // (4) バッファをフラッシュし、物理コネクションに送信
}
if err != nil {
pc.markBroken() // (5) エラーがあればコネクションを壊れた状態にする
}
wr.ch <- err // (6) 結果を roundTrip に返す
}
}
for wr := range pc.writech
:writech
からwriteRequest
を受け取るまでブロックします。チャネルがクローズされるとループを終了します。if pc.isBroken()
: コネクションがすでに壊れている場合、書き込みは行わず、エラーを返します。wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra)
: HTTPリクエストのヘッダーとボディをbufio.Writer
(pc.bw
) に書き込みます。この時点ではまだ物理的なネットワークには送信されません。err = pc.bw.Flush()
:pc.bw
にバッファリングされたデータを物理的なネットワークコネクションに送信します。ここで実際にデータがサーバーに送られます。if err != nil { pc.markBroken() }
: 書き込みまたはフラッシュ中にエラーが発生した場合、そのコネクションを「壊れた」状態としてマークします。これにより、このコネクションが将来のHTTPリクエストに再利用されるのを防ぎます。wr.ch <- err
: 書き込み操作の結果(エラーまたはnil)を、writeRequest
に含まれるチャネルを通じてroundTrip
ゴルーチンに返します。
roundTrip
メソッド
このメソッドは、単一のHTTPリクエストを送信し、レスポンスを受け取るクライアント側の主要なロジックを含んでいます。
func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
// ... (前略) ...
// Write the request concurrently with waiting for a response,
// in case the server decides to reply before reading our full
// request body.
writeErrCh := make(chan error, 1) // (1) 書き込みエラーを受け取るチャネル
pc.writech <- writeRequest{req, writeErrCh} // (2) writeLoop に書き込み要求を送信
resc := make(chan responseAndError, 1) // (3) レスポンスを受け取るチャネル
pc.reqch <- requestAndChan{req.Request, resc, requestedGzip} // (4) readLoop にレスポンス待機要求を送信
var re responseAndError
WaitResponse:
for {
select { // (5) writeErrCh と resc の両方を同時に待機
case err := <-writeErrCh: // (6) 書き込みエラーが発生した場合
if err != nil {
re = responseAndError{nil, err}
break WaitResponse
}
case re = <-resc: // (7) レスポンスが到着した場合
break WaitResponse
}
}
// ... (後略) ...
return re.res, re.err
}
writeErrCh := make(chan error, 1)
:writeLoop
から書き込みエラーを受け取るためのチャネルを作成します。バッファリングされているため、writeLoop
はroundTrip
がチャネルを読み取るのを待たずにエラーを送信できます。pc.writech <- writeRequest{req, writeErrCh}
:writeRequest
を作成し、pc.writech
に送信します。これにより、writeLoop
ゴルーチンがこのリクエストの書き込みを開始します。この操作は非同期で行われます。resc := make(chan responseAndError, 1)
:readLoop
からサーバーレスポンスを受け取るためのチャネルを作成します。pc.reqch <- requestAndChan{req.Request, resc, requestedGzip}
:readLoop
ゴルーチンに、このリクエストに対するレスポンスを待機するよう指示します。select { ... }
: ここがこの変更の最も重要な部分です。roundTrip
は、writeErrCh
とresc
の両方を同時に待機します。case err := <-writeErrCh:
: もしwriteLoop
がリクエストの書き込み中にエラー(例: "broken pipe")を検出してwriteErrCh
に送信した場合、このケースが実行されます。roundTrip
はそのエラーを即座に処理し、レスポンスの待機を中断してエラーを返します。case re = <-resc:
: もしreadLoop
がサーバーからのレスポンスを先に受け取ってresc
に送信した場合、このケースが実行されます。roundTrip
はレスポンスを処理し、書き込みエラーの発生を待たずにレスポンスを返します。
markBroken
メソッド
pc.broken = true
: コネクションが再利用できない状態であることを示すフラグを設定します。- このメソッドは、基盤となるTCPコネクションをすぐに閉じません。これは、
readLoop
がまだそのコネクションから残りのデータを読み取っている可能性があるためです。コネクションの実際のクローズは、readLoop
が終了するか、または他のメカニズムによって行われます。
これらの変更により、net/http
クライアントは、リクエストの書き込みとレスポンスの読み込みを真に並行して実行できるようになり、サーバーからの早期レスポンスを適切に処理し、より堅牢な通信を実現します。
関連リンク
- Go CL (Change List): https://golang.org/cl/6238043
- GitHub Issue #3595: https://github.com/golang/go/issues/3595
参考にした情報源リンク
- Go言語の公式ドキュメント:
net/http
パッケージ - Go言語の並行処理に関するドキュメント(Goroutines, Channels, Select)
- HTTP/1.1 プロトコル仕様 (RFC 2616 または RFC 7230-7235)
- TCP/IP プロトコルに関する一般的な知識
bufio
パッケージのドキュメントio
パッケージのドキュメント- Go言語のテストフレームワーク
testing
パッケージのドキュメント httptest
パッケージのドキュメント- Web検索結果: "Go net/http Issue 3595" (このコミットが修正した問題に関連する議論や、
rstAvoidanceDelay
などの関連概念について言及されている可能性があります。)