Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

[インデックス 13364] ファイルの概要

このコミットは、Go言語の標準ライブラリ net/http パッケージにおけるクライアントの挙動を改善するものです。具体的には、HTTPリクエストのボディを送信している最中にサーバーからレスポンスが返された場合(例えば、認証エラーなど)、以前はクライアントがそのレスポンスを適切に処理できず、書き込みエラー("broken pipe"など)として扱ってしまう問題がありました。このコミットにより、リクエストの書き込みとレスポンスの読み込みが並行して行われるようになり、このようなシナリオでもサーバーからのレスポンスを正確に受け取れるようになりました。

コミット

commit a5aa91b9a2a013be9ced9ae05474e1dce2fe16d4
Author: Brad Fitzpatrick <bradfitz@golang.org>
Date:   Tue Jun 19 09:20:41 2012 -0700

    net/http: make client await response concurrently with writing request
    
    If the server replies with an HTTP response before we're done
    writing our body (for instance "401 Unauthorized" response), we
    were previously ignoring that, since we returned our write
    error ("broken pipe", etc) before ever reading the response.
    Now we read and write at the same time.
    
    Fixes #3595
    
    R=rsc, adg
    CC=golang-dev
    https://golang.org/cl/6238043

GitHub上でのコミットページへのリンク

https://github.com/golang/go/commit/a5aa91b9a2a013be9ced9ae05474e1dce2fe16d4

元コミット内容

net/http: クライアントがリクエストの書き込みと並行してレスポンスを待機するようにする

もしサーバーがリクエストボディの書き込みが完了する前にHTTPレスポンスを返した場合(例えば「401 Unauthorized」レスポンスなど)、以前はクライアントがそれを無視していました。なぜなら、レスポンスを読み込む前に書き込みエラー(「broken pipe」など)を返していたからです。 これで、読み込みと書き込みが同時に行われるようになります。

Fixes #3595

R=rsc, adg CC=golang-dev https://golang.org/cl/6238043

変更の背景

この変更の背景には、HTTPクライアントがリクエストボディをサーバーに送信している途中で、サーバーが早期にレスポンスを返すという特定のシナリオにおける問題がありました。

従来の net/http クライアントの実装では、HTTPリクエストの送信(特にボディの書き込み)とサーバーからのレスポンスの受信が、ある程度シーケンシャルに行われていました。つまり、クライアントはまずリクエスト全体を書き込もうとし、その書き込みが完了するか、あるいはエラーが発生するまで、サーバーからのレスポンスを待機しませんでした。

このシーケンシャルな挙動が問題となるのは、以下のようなケースです。

  1. 早期レスポンス: サーバーがリクエストボディの全てを受け取る前に、何らかの理由でレスポンスを返す場合。例えば、認証が必要なエンドポイントに対して認証情報なしで大きなボディをPOSTした場合、サーバーはボディの受信を待たずに「401 Unauthorized」のようなエラーレスポンスをすぐに返すことがあります。
  2. 「broken pipe」問題: 従来のクライアントは、リクエストボディの書き込み中にサーバーが接続を切断した場合(早期レスポンスを返した後など)、write システムコールが「broken pipe」エラーを返していました。クライアントはこの書き込みエラーを即座に処理し、レスポンスを読み込むことなくエラーを返していました。
  3. 真のレスポンスの喪失: 結果として、クライアントはサーバーが意図した「401 Unauthorized」のような意味のあるレスポンスを受け取ることができず、単なる書き込みエラーとして処理してしまい、問題の根本原因を特定するのが困難でした。

このコミットは、このような状況下でもクライアントがサーバーからの早期レスポンスを適切に受け取り、処理できるようにするために導入されました。これにより、クライアントはより堅牢になり、サーバーの挙動に柔軟に対応できるようになります。

前提知識の解説

このコミットを理解するためには、以下の技術的知識が役立ちます。

  1. HTTPプロトコル:

    • リクエスト/レスポンスモデル: クライアントがリクエストを送信し、サーバーがレスポンスを返すという基本的なHTTPの通信モデル。
    • リクエストボディ: POSTやPUTなどのメソッドで、クライアントがサーバーに送信するデータ本体。
    • レスポンスステータスコード: サーバーがリクエストの結果を示す3桁の数字(例: 200 OK, 401 Unauthorized, 500 Internal Server Error)。
    • TCPソケット: HTTP通信の基盤となるトランスポート層のプロトコル。データはストリームとして送受信されます。
  2. Go言語の並行処理:

    • Goroutine (ゴルーチン): Go言語における軽量なスレッドのようなもの。非常に低コストで多数生成でき、並行処理を実現します。
    • Channel (チャネル): Goroutine間で安全にデータを送受信するための通信メカニズム。チャネルを介した通信は同期的に行われるため、データ競合を防ぎます。
    • select ステートメント: 複数のチャネル操作を同時に待機し、準備ができたチャネル操作を実行するためのGoの構文。非同期処理の調整に不可欠です。
  3. net/http パッケージ:

    • Transport: net/http パッケージにおいて、実際のHTTPリクエストの送信とレスポンスの受信を担当するインターフェース。コネクションの管理、リクエストの多重化、プロキシの処理などを行います。
    • persistConn: Transport の内部で、単一のTCPコネクションを介して複数のHTTPリクエスト/レスポンスを処理するための構造体。HTTP/1.1のKeep-Alive機能を実現するために使用されます。
    • readLoop: persistConn 内で、サーバーからのレスポンスを継続的に読み込むためのGoroutine。
    • roundTrip: Transport インターフェースの主要なメソッドで、単一のHTTPリクエストを送信し、対応するレスポンスを受け取る一連の処理をカプセル化します。
  4. I/O操作とバッファリング:

    • io.Reader / io.Writer: Go言語における基本的なI/Oインターフェース。データの読み書き操作を抽象化します。
    • bufio.Reader / bufio.Writer: バッファリングされたI/Oを提供する構造体。これにより、小さなI/O操作が多数発生するのを防ぎ、効率を向上させます。Flush() メソッドはバッファの内容を実際に書き込み先に送るために重要です。
    • 「broken pipe」エラー: パイプやソケットの書き込み側が、読み込み側がすでにクローズされている状態で書き込みを試みた場合に発生するエラー。TCPソケット通信では、サーバーが接続を閉じた後にクライアントが書き込みを続けると発生します。

これらの概念を理解することで、コミットがどのようにして既存の課題を解決し、net/http クライアントの堅牢性を向上させたのかを深く把握できます。

技術的詳細

このコミットの核心は、HTTPクライアントがリクエストの書き込みとレスポンスの読み込みを並行して行うための新しいメカニズムを導入した点にあります。これにより、サーバーがリクエストボディの受信完了を待たずに早期にレスポンスを返した場合でも、クライアントがそのレスポンスを適切に処理できるようになります。

具体的な技術的変更点は以下の通りです。

  1. writech チャネルの導入:

    • persistConn 構造体に writech chan writeRequest という新しいチャネルが追加されました。このチャネルは、roundTrip ゴルーチンからリクエストの書き込み要求を受け取り、writeLoop ゴルーチンに渡す役割を担います。
    • writeRequest は、書き込むリクエスト (*transportRequest) と、書き込み結果を返すためのチャネル (chan<- error) を含む新しい構造体です。
  2. writeLoop ゴルーチンの導入:

    • persistConn の初期化時(getConn メソッド内)に、go pconn.writeLoop() という新しいゴルーチンが起動されるようになりました。
    • この writeLoop ゴルーチンは、pc.writech から writeRequest を継続的に読み取ります。
    • writeRequest を受け取ると、その中のリクエスト (wr.req.Request) を pc.bw (バッファリングされたライター) を使って実際にコネクションに書き込みます。
    • 書き込みが完了したら、pc.bw.Flush() を呼び出してバッファの内容を物理的なコネクションに送信します。
    • 書き込み中にエラーが発生した場合(例: サーバーが接続を閉じたことによる「broken pipe」)、pc.markBroken() を呼び出してコネクションを「壊れた」状態としてマークし、そのエラーを writeRequest に含まれるチャネル (wr.ch) を通じて roundTrip ゴルーチンに返します。
  3. roundTrip メソッドの変更:

    • 以前は roundTrip メソッド内でリクエストの書き込みとフラッシュを直接行い、その後レスポンスを待機していました。
    • 変更後、リクエストの書き込みは writeLoop ゴルーチンに委譲されるようになりました。具体的には、writeRequest を作成し、pc.writech に送信します。
    • 最も重要な変更は、select ステートメントの導入です。roundTrip は、writeErrCh (書き込みエラーを受け取るチャネル) と resc (レスポンスを受け取るチャネル) の両方を同時に待機するようになりました。
    • これにより、リクエストの書き込みが完了する前にサーバーからレスポンスが返された場合でも、resc チャネルからのイベントが先に発生し、クライアントは早期レスポンスを即座に処理できるようになります。
    • もし書き込みエラーが先に発生した場合、そのエラーが優先され、レスポンスの待機は中断されます。
  4. markBroken メソッドの導入:

    • persistConnmarkBroken() という新しいメソッドが追加されました。これは、コネクションが再利用できない状態になったことをマークしますが、基盤となるコネクション自体はすぐに閉じません。これは、readLoop がまだそのコネクションからデータを読み取っている可能性があるためです。

これらの変更により、net/http クライアントは、リクエストの書き込みとレスポンスの読み込みという2つの独立したタスクを並行して実行できるようになり、サーバーからの早期レスポンスを適切に処理し、より堅牢な通信を実現します。

コアとなるコードの変更箇所

このコミットによる主要なコード変更は、src/pkg/net/http/transport.gosrc/pkg/net/http/transport_test.go の2つのファイルに集中しています。

src/pkg/net/http/transport.go

  1. persistConn 構造体への writech の追加:

    --- a/src/pkg/net/http/transport.go
    +++ b/src/pkg/net/http/transport.go
    @@ -487,7 +489,8 @@ type persistConn struct {
     	closed   bool                // whether conn has been closed
     	br       *bufio.Reader       // from conn
     	bw       *bufio.Writer       // to conn
    -	reqch    chan requestAndChan // written by roundTrip(); read by readLoop()
    +	reqch    chan requestAndChan // written by roundTrip; read by readLoop
    +	writech  chan writeRequest   // written by roundTrip; read by writeLoop
     	isProxy  bool
    

    persistConnwritech チャネルが追加され、リクエストの書き込み要求を writeLoop に送るために使用されます。

  2. getConnwritech の初期化と writeLoop の起動:

    --- a/src/pkg/net/http/transport.go
    +++ b/src/pkg/net/http/transport.go
    @@ -323,6 +323,7 @@ func (t *Transport) getConn(cm *connectMethod) (*persistConn, error) {
     		cacheKey: cm.String(),
     		conn:     conn,
     		reqch:    make(chan requestAndChan, 50),\n
    +		writech:  make(chan writeRequest, 50),
     	}
     
     	switch {
    @@ -380,6 +381,7 @@ func (t *Transport) getConn(cm *connectMethod) (*persistConn, error) {
     	pconn.br = bufio.NewReader(pconn.conn)
     	pconn.bw = bufio.NewWriter(pconn.conn)
     	go pconn.readLoop()
    +	go pconn.writeLoop()
     	return pconn, nil
     }
    

    新しいコネクションが確立される際に、writech が初期化され、writeLoop ゴルーチンが起動されます。

  3. readLoopwritech をクローズ:

    --- a/src/pkg/net/http/transport.go
    +++ b/src/pkg/net/http/transport.go
    @@ -519,6 +522,7 @@ func remoteSideClosed(err error) bool {
     }
     
     func (pc *persistConn) readLoop() {
    +	defer close(pc.writech)
     	alive := true
     	var lastbody io.ReadCloser // last response body, if any, read on this connection
    

    readLoop が終了する際に writech をクローズすることで、writeLoop が適切に終了できるようにします。

  4. writeLoop ゴルーチンの追加:

    --- a/src/pkg/net/http/transport.go
    +++ b/src/pkg/net/http/transport.go
    @@ -615,6 +619,23 @@ func (pc *persistConn) readLoop() {
     	}
     }
     
    +func (pc *persistConn) writeLoop() {
    +	for wr := range pc.writech {
    +		if pc.isBroken() {
    +			wr.ch <- errors.New("http: can't write HTTP request on broken connection")
    +			continue
    +		}
    +		err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra)
    +		if err == nil {
    +			err = pc.bw.Flush()
    +		}
    +		if err != nil {
    +			pc.markBroken()
    +		}
    +		wr.ch <- err
    +	}
    +}
    +
     type responseAndError struct {
      res *Response
      err error
    

    リクエストの書き込みとフラッシュを担当する新しいゴルーチン writeLoop が追加されました。

  5. writeRequest 構造体の追加:

    --- a/src/pkg/net/http/transport.go
    +++ b/src/pkg/net/http/transport.go
    @@ -630,6 +651,15 @@ type requestAndChan struct {
      addedGzip bool
     }
     
    +// A writeRequest is sent by the readLoop's goroutine to the
    +// writeLoop's goroutine to write a request while the read loop
    +// concurrently waits on both the write response and the server's
    +// reply.
    +type writeRequest struct {
    +	req *transportRequest
    +	ch  chan<- error
    +}
    +
     func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
      if pc.mutateHeaderFunc != nil {
       pc.mutateHeaderFunc(req.extraHeaders())
    

    writeLooproundTrip 間で書き込み要求をやり取りするための writeRequest 構造体が定義されました。

  6. roundTrip メソッドの並行処理ロジックの変更:

    --- a/src/pkg/net/http/transport.go
    +++ b/src/pkg/net/http/transport.go
    @@ -652,16 +682,29 @@ func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err err
      pc.numExpectedResponses++
      pc.lk.Unlock()
     
    -	err = req.Request.write(pc.bw, pc.isProxy, req.extra)
    -	if err != nil {
    -		pc.close()
    -		return
    +	// Write the request concurrently with waiting for a response,
    +	// in case the server decides to reply before reading our full
    +	// request body.
    +	writeErrCh := make(chan error, 1)
    +	pc.writech <- writeRequest{req, writeErrCh}
    +
    +	resc := make(chan responseAndError, 1)
    +	pc.reqch <- requestAndChan{req.Request, resc, requestedGzip}
    +
    +	var re responseAndError
    +WaitResponse:
    +	for {
    +		select {
    +		case err := <-writeErrCh:
    +			if err != nil {
    +				re = responseAndError{nil, err}
    +				break WaitResponse
    +			}
    +		case re = <-resc:
    +			break WaitResponse
    +		}
      }
    -	pc.bw.Flush()
     
    -	ch := make(chan responseAndError, 1)
    -	pc.reqch <- requestAndChan{req.Request, ch, requestedGzip}
    -	re := <-ch
      pc.lk.Lock()
      pc.numExpectedResponses--
      pc.lk.Unlock()
    

    リクエストの書き込みを writech に送信し、writeErrChresc の両方を select で同時に待機するようになりました。

  7. markBroken メソッドの追加:

    --- a/src/pkg/net/http/transport.go
    +++ b/src/pkg/net/http/transport.go
    @@ -669,6 +712,15 @@ func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err err
      return re.res, re.err
     }
     
    +// markBroken marks a connection as broken (so it's not reused).
    +// It differs from close in that it doesn't close the underlying
    +// connection for use when it's still being read.
    +func (pc *persistConn) markBroken() {
    +	pc.lk.Lock()
    +	defer pc.lk.Unlock()
    +	pc.broken = true
    +}
    +
     func (pc *persistConn) close() {
      pc.lk.Lock()
      defer pc.lk.Unlock()
    

    コネクションを「壊れた」状態としてマークするためのヘルパーメソッドが追加されました。

src/pkg/net/http/transport_test.go

  1. TestIssue3595 テストケースの追加:
    --- a/src/pkg/net/http/transport_test.go
    +++ b/src/pkg/net/http/transport_test.go
    @@ -833,6 +833,30 @@ func TestIssue3644(t *testing.T) {
      }
     }
     
    +// Test that a client receives a server's reply, even if the server doesn't read
    +// the entire request body.
    +func TestIssue3595(t *testing.T) {
    +	const deniedMsg = "sorry, denied."
    +	ts := httptest.NewServer(HandlerFunc(func(w ResponseWriter, r *Request) {
    +		Error(w, deniedMsg, StatusUnauthorized)
    +	}))
    +	defer ts.Close()
    +	tr := &Transport{}
    +	c := &Client{Transport: tr}
    +	res, err := c.Post(ts.URL, "application/octet-stream", neverEnding('a'))
    +	if err != nil {
    +		t.Errorf("Post: %v", err)
    +		return
    +	}
    +	got, err := ioutil.ReadAll(res.Body)
    +	if err != nil {
    +		t.Fatalf("Body ReadAll: %v", err)
    +	}
    +	if !strings.Contains(string(got), deniedMsg) {
    +		t.Errorf("Known bug: response %q does not contain %q", got, deniedMsg)
    +	}
    +}
    +
     type fooProto struct{}
     
     func (fooProto) RoundTrip(req *Request) (*Response, error) {
    
    このテストケースは、サーバーがリクエストボディを完全に読み取る前にレスポンスを返した場合でも、クライアントがそのレスポンスを正しく受け取れることを検証します。特に、neverEnding('a') という無限のボディを送信し、サーバーが 401 Unauthorized を返すシナリオをシミュレートしています。

これらの変更は、net/http クライアントの堅牢性と信頼性を大幅に向上させるものです。

コアとなるコードの解説

このコミットのコアとなる変更は、net/http/transport.go 内の persistConn 構造体と、それに関連する roundTripreadLoop、そして新しく追加された writeLoop ゴルーチンの協調動作にあります。

persistConn 構造体

  • writech chan writeRequest: この新しいチャネルは、roundTrip メソッドから writeLoop ゴルーチンへのリクエスト書き込み要求を伝達するために使用されます。バッファリングされたチャネル(容量50)であるため、ある程度の要求をキューに入れることができます。

writeRequest 構造体

  • req *transportRequest: 実際に書き込むHTTPリクエストの情報を保持します。
  • ch chan<- error: 書き込み操作の結果(エラーまたはnil)を roundTrip メソッドにフィードバックするためのチャネルです。これにより、書き込みの成功/失敗を roundTrip が知ることができます。

getConn 関数

  • persistConn のインスタンスが作成される際に、writech が初期化され、go pconn.writeLoop() が呼び出されます。これにより、コネクションごとに専用の writeLoop ゴルーチンが起動され、リクエストの書き込みを非同期で処理する準備が整います。

writeLoop ゴルーチン

このゴルーチンは、リクエストボディの実際の書き込みを担当します。

func (pc *persistConn) writeLoop() {
	for wr := range pc.writech { // (1) writech から書き込み要求を読み取る
		if pc.isBroken() { // (2) コネクションが壊れていないかチェック
			wr.ch <- errors.New("http: can't write HTTP request on broken connection")
			continue
		}
		err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra) // (3) リクエストをバッファに書き込む
		if err == nil {
			err = pc.bw.Flush() // (4) バッファをフラッシュし、物理コネクションに送信
		}
		if err != nil {
			pc.markBroken() // (5) エラーがあればコネクションを壊れた状態にする
		}
		wr.ch <- err // (6) 結果を roundTrip に返す
	}
}
  1. for wr := range pc.writech: writech から writeRequest を受け取るまでブロックします。チャネルがクローズされるとループを終了します。
  2. if pc.isBroken(): コネクションがすでに壊れている場合、書き込みは行わず、エラーを返します。
  3. wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra): HTTPリクエストのヘッダーとボディを bufio.Writer (pc.bw) に書き込みます。この時点ではまだ物理的なネットワークには送信されません。
  4. err = pc.bw.Flush(): pc.bw にバッファリングされたデータを物理的なネットワークコネクションに送信します。ここで実際にデータがサーバーに送られます。
  5. if err != nil { pc.markBroken() }: 書き込みまたはフラッシュ中にエラーが発生した場合、そのコネクションを「壊れた」状態としてマークします。これにより、このコネクションが将来のHTTPリクエストに再利用されるのを防ぎます。
  6. wr.ch <- err: 書き込み操作の結果(エラーまたはnil)を、writeRequest に含まれるチャネルを通じて roundTrip ゴルーチンに返します。

roundTrip メソッド

このメソッドは、単一のHTTPリクエストを送信し、レスポンスを受け取るクライアント側の主要なロジックを含んでいます。

func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
	// ... (前略) ...

	// Write the request concurrently with waiting for a response,
	// in case the server decides to reply before reading our full
	// request body.
	writeErrCh := make(chan error, 1) // (1) 書き込みエラーを受け取るチャネル
	pc.writech <- writeRequest{req, writeErrCh} // (2) writeLoop に書き込み要求を送信

	resc := make(chan responseAndError, 1) // (3) レスポンスを受け取るチャネル
	pc.reqch <- requestAndChan{req.Request, resc, requestedGzip} // (4) readLoop にレスポンス待機要求を送信

	var re responseAndError
WaitResponse:
	for {
		select { // (5) writeErrCh と resc の両方を同時に待機
		case err := <-writeErrCh: // (6) 書き込みエラーが発生した場合
			if err != nil {
				re = responseAndError{nil, err}
				break WaitResponse
			}
		case re = <-resc: // (7) レスポンスが到着した場合
			break WaitResponse
		}
	}

	// ... (後略) ...
	return re.res, re.err
}
  1. writeErrCh := make(chan error, 1): writeLoop から書き込みエラーを受け取るためのチャネルを作成します。バッファリングされているため、writeLooproundTrip がチャネルを読み取るのを待たずにエラーを送信できます。
  2. pc.writech <- writeRequest{req, writeErrCh}: writeRequest を作成し、pc.writech に送信します。これにより、writeLoop ゴルーチンがこのリクエストの書き込みを開始します。この操作は非同期で行われます。
  3. resc := make(chan responseAndError, 1): readLoop からサーバーレスポンスを受け取るためのチャネルを作成します。
  4. pc.reqch <- requestAndChan{req.Request, resc, requestedGzip}: readLoop ゴルーチンに、このリクエストに対するレスポンスを待機するよう指示します。
  5. select { ... }: ここがこの変更の最も重要な部分です。roundTrip は、writeErrChresc の両方からのイベントを同時に待機します。
  6. case err := <-writeErrCh:: もし writeLoop がリクエストの書き込み中にエラー(例: "broken pipe")を検出して writeErrCh に送信した場合、このケースが実行されます。roundTrip はそのエラーを即座に処理し、レスポンスの待機を中断してエラーを返します。
  7. case re = <-resc:: もし readLoop がサーバーからのレスポンスを先に受け取って resc に送信した場合、このケースが実行されます。roundTrip はレスポンスを処理し、書き込みエラーの発生を待たずにレスポンスを返します。

markBroken メソッド

  • pc.broken = true: コネクションが再利用できない状態であることを示すフラグを設定します。
  • このメソッドは、基盤となるTCPコネクションをすぐに閉じません。これは、readLoop がまだそのコネクションから残りのデータを読み取っている可能性があるためです。コネクションの実際のクローズは、readLoop が終了するか、または他のメカニズムによって行われます。

これらの変更により、net/http クライアントは、リクエストの書き込みとレスポンスの読み込みを真に並行して実行できるようになり、サーバーからの早期レスポンスを適切に処理し、より堅牢な通信を実現します。

関連リンク

参考にした情報源リンク

  • Go言語の公式ドキュメント: net/http パッケージ
  • Go言語の並行処理に関するドキュメント(Goroutines, Channels, Select)
  • HTTP/1.1 プロトコル仕様 (RFC 2616 または RFC 7230-7235)
  • TCP/IP プロトコルに関する一般的な知識
  • bufio パッケージのドキュメント
  • io パッケージのドキュメント
  • Go言語のテストフレームワーク testing パッケージのドキュメント
  • httptest パッケージのドキュメント

[インデックス 13364] ファイルの概要

このコミットは、Go言語の標準ライブラリ net/http パッケージにおけるクライアントの挙動を改善するものです。具体的には、HTTPリクエストのボディを送信している最中にサーバーからレスポンスが返された場合(例えば、認証エラーなど)、以前はクライアントがそのレスポンスを適切に処理できず、書き込みエラー("broken pipe"など)として扱ってしまう問題がありました。このコミットにより、リクエストの書き込みとレスポンスの読み込みが並行して行われるようになり、このようなシナリオでもサーバーからのレスポンスを正確に受け取れるようになりました。

コミット

commit a5aa91b9a2a013be9ced9ae05474e1dce2fe16d4
Author: Brad Fitzpatrick <bradfitz@golang.org>
Date:   Tue Jun 19 09:20:41 2012 -0700

    net/http: make client await response concurrently with writing request
    
    If the server replies with an HTTP response before we're done
    writing our body (for instance "401 Unauthorized" response), we
    were previously ignoring that, since we returned our write
    error ("broken pipe", etc) before ever reading the response.
    Now we read and write at the same time.
    
    Fixes #3595
    
    R=rsc, adg
    CC=golang-dev
    https://golang.org/cl/6238043

GitHub上でのコミットページへのリンク

https://github.com/golang/go/commit/a5aa91b9a2a013be9ced9ae05474e1dce2fe16d4

元コミット内容

net/http: クライアントがリクエストの書き込みと並行してレスポンスを待機するようにする

もしサーバーがリクエストボディの書き込みが完了する前にHTTPレスポンスを返した場合(例えば「401 Unauthorized」レスポンスなど)、以前はクライアントがそれを無視していました。なぜなら、レスポンスを読み込む前に書き込みエラー(「broken pipe」など)を返していたからです。 これで、読み込みと書き込みが同時に行われるようになります。

Fixes #3595

R=rsc, adg CC=golang-dev https://golang.org/cl/6238043

変更の背景

この変更の背景には、HTTPクライアントがリクエストボディをサーバーに送信している途中で、サーバーが早期にレスポンスを返すという特定のシナリオにおける問題がありました。

従来の net/http クライアントの実装では、HTTPリクエストの送信(特にボディの書き込み)とサーバーからのレスポンスの受信が、ある程度シーケンシャルに行われていました。つまり、クライアントはまずリクエスト全体を書き込もうとし、その書き込みが完了するか、あるいはエラーが発生するまで、サーバーからのレスポンスを待機しませんでした。

このシーケンシャルな挙動が問題となるのは、以下のようなケースです。

  1. 早期レスポンス: サーバーがリクエストボディの全てを受け取る前に、何らかの理由でレスポンスを返す場合。例えば、認証が必要なエンドポイントに対して認証情報なしで大きなボディをPOSTした場合、サーバーはボディの受信を待たずに「401 Unauthorized」のようなエラーレスポンスをすぐに返すことがあります。
  2. 「broken pipe」問題: 従来のクライアントは、リクエストボディの書き込み中にサーバーが接続を切断した場合(早期レスポンスを返した後など)、write システムコールが「broken pipe」エラーを返していました。クライアントはこの書き込みエラーを即座に処理し、レスポンスを読み込むことなくエラーを返していました。
  3. 真のレスポンスの喪失: 結果として、クライアントはサーバーが意図した「401 Unauthorized」のような意味のあるレスポンスを受け取ることができず、単なる書き込みエラーとして処理してしまい、問題の根本原因を特定するのが困難でした。

このコミットは、このような状況下でもクライアントがサーバーからの早期レスポンスを適切に受け取り、処理できるようにするために導入されました。これにより、クライアントはより堅牢になり、サーバーの挙動に柔軟に対応できるようになります。

前提知識の解説

このコミットを理解するためには、以下の技術的知識が役立ちます。

  1. HTTPプロトコル:

    • リクエスト/レスポンスモデル: クライアントがリクエストを送信し、サーバーがレスポンスを返すという基本的なHTTPの通信モデル。
    • リクエストボディ: POSTやPUTなどのメソッドで、クライアントがサーバーに送信するデータ本体。
    • レスポンスステータスコード: サーバーがリクエストの結果を示す3桁の数字(例: 200 OK, 401 Unauthorized, 500 Internal Server Error)。
    • TCPソケット: HTTP通信の基盤となるトランスポート層のプロトコル。データはストリームとして送受信されます。
  2. Go言語の並行処理:

    • Goroutine (ゴルーチン): Go言語における軽量なスレッドのようなもの。非常に低コストで多数生成でき、並行処理を実現します。
    • Channel (チャネル): Goroutine間で安全にデータを送受信するための通信メカニズム。チャネルを介した通信は同期的に行われるため、データ競合を防ぎます。
    • select ステートメント: 複数のチャネル操作を同時に待機し、準備ができたチャネル操作を実行するためのGoの構文。非同期処理の調整に不可欠です。
  3. net/http パッケージ:

    • Transport: net/http パッケージにおいて、実際のHTTPリクエストの送信とレスポンスの受信を担当するインターフェース。コネクションの管理、リクエストの多重化、プロキシの処理などを行います。
    • persistConn: Transport の内部で、単一のTCPコネクションを介して複数のHTTPリクエスト/レスポンスを処理するための構造体。HTTP/1.1のKeep-Alive機能を実現するために使用されます。
    • readLoop: persistConn 内で、サーバーからのレスポンスを継続的に読み込むためのGoroutine。
    • roundTrip: Transport インターフェースの主要なメソッドで、単一のHTTPリクエストを送信し、対応するレスポンスを受け取る一連の処理をカプセル化します。
  4. I/O操作とバッファリング:

    • io.Reader / io.Writer: Go言語における基本的なI/Oインターフェース。データの読み書き操作を抽象化します。
    • bufio.Reader / bufio.Writer: バッファリングされたI/Oを提供する構造体。これにより、小さなI/O操作が多数発生するのを防ぎ、効率を向上させます。Flush() メソッドはバッファの内容を実際に書き込み先に送るために重要です。
    • 「broken pipe」エラー: パイプやソケットの書き込み側が、読み込み側がすでにクローズされている状態で書き込みを試みた場合に発生するエラー。TCPソケット通信では、サーバーが接続を閉じた後にクライアントが書き込みを続けると発生します。

これらの概念を理解することで、コミットがどのようにして既存の課題を解決し、net/http クライアントの堅牢性を向上させたのかを深く把握できます。

技術的詳細

このコミットの核心は、HTTPクライアントがリクエストの書き込みとレスポンスの読み込みを並行して行うための新しいメカニズムを導入した点にあります。これにより、サーバーがリクエストボディの受信完了を待たずに早期にレスポンスを返した場合でも、クライアントがそのレスポンスを適切に処理できるようになります。

具体的な技術的変更点は以下の通りです。

  1. writech チャネルの導入:

    • persistConn 構造体に writech chan writeRequest という新しいチャネルが追加されました。このチャネルは、roundTrip ゴルーチンからリクエストの書き込み要求を受け取り、writeLoop ゴルーチンに渡す役割を担います。
    • writeRequest は、書き込むリクエスト (*transportRequest) と、書き込み結果を返すためのチャネル (chan<- error) を含む新しい構造体です。
  2. writeLoop ゴルーチンの導入:

    • persistConn の初期化時(getConn メソッド内)に、go pconn.writeLoop() という新しいゴルーチンが起動されるようになりました。
    • この writeLoop ゴルーチンは、pc.writech から writeRequest を継続的に読み取ります。
    • writeRequest を受け取ると、その中のリクエスト (wr.req.Request) を pc.bw (バッファリングされたライター) を使って実際にコネクションに書き込みます。
    • 書き込みが完了したら、pc.bw.Flush() を呼び出してバッファの内容を物理的なコネクションに送信します。
    • 書き込み中にエラーが発生した場合(例: サーバーが接続を閉じたことによる「broken pipe」)、pc.markBroken() を呼び出してコネクションを「壊れた」状態としてマークし、そのエラーを writeRequest に含まれるチャネル (wr.ch) を通じて roundTrip ゴルーチンに返します。
  3. roundTrip メソッドの変更:

    • 以前は roundTrip メソッド内でリクエストの書き込みとフラッシュを直接行い、その後レスポンスを待機していました。
    • 変更後、リクエストの書き込みは writeLoop ゴルーチンに委譲されるようになりました。具体的には、writeRequest を作成し、pc.writech に送信します。
    • 最も重要な変更は、select ステートメントの導入です。roundTrip は、writeErrCh (書き込みエラーを受け取るチャネル) と resc (レスポンスを受け取るチャネル) の両方を同時に待機するようになりました。
    • これにより、リクエストの書き込みが完了する前にサーバーからレスポンスが返された場合でも、resc チャネルからのイベントが先に発生し、クライアントは早期レスポンスを即座に処理できるようになります。
    • もし書き込みエラーが先に発生した場合、そのエラーが優先され、レスポンスの待機は中断されます。
  4. markBroken メソッドの導入:

    • persistConnmarkBroken() という新しいメソッドが追加されました。これは、コネクションが再利用できない状態になったことをマークしますが、基盤となるコネクション自体はすぐに閉じません。これは、readLoop がまだそのコネクションからデータを読み取っている可能性があるためです。

これらの変更により、net/http クライアントは、リクエストの書き込みとレスポンスの読み込みという2つの独立したタスクを並行して実行できるようになり、サーバーからの早期レスポンスを適切に処理し、より堅牢な通信を実現します。

コアとなるコードの変更箇所

このコミットによる主要なコード変更は、src/pkg/net/http/transport.gosrc/pkg/net/http/transport_test.go の2つのファイルに集中しています。

src/pkg/net/http/transport.go

  1. persistConn 構造体への writech の追加:

    --- a/src/pkg/net/http/transport.go
    +++ b/src/pkg/net/http/transport.go
    @@ -487,7 +489,8 @@ type persistConn struct {
     	closed   bool                // whether conn has been closed
     	br       *bufio.Reader       // from conn
     	bw       *bufio.Writer       // to conn
    -	reqch    chan requestAndChan // written by roundTrip(); read by readLoop()
    +	reqch    chan requestAndChan // written by roundTrip; read by readLoop
    +	writech  chan writeRequest   // written by roundTrip; read by writeLoop
     	isProxy  bool
    

    persistConnwritech チャネルが追加され、リクエストの書き込み要求を writeLoop に送るために使用されます。

  2. getConnwritech の初期化と writeLoop の起動:

    --- a/src/pkg/net/http/transport.go
    +++ b/src/pkg/net/http/transport.go
    @@ -323,6 +323,7 @@ func (t *Transport) getConn(cm *connectMethod) (*persistConn, error) {
     		cacheKey: cm.String(),
     		conn:     conn,
     		reqch:    make(chan requestAndChan, 50),\n
    +		writech:  make(chan writeRequest, 50),
     	}
     
     	switch {
    @@ -380,6 +381,7 @@ func (t *Transport) getConn(cm *connectMethod) (*persistConn, error) {
     	pconn.br = bufio.NewReader(pconn.conn)
     	pconn.bw = bufio.NewWriter(pconn.conn)
     	go pconn.readLoop()
    +	go pconn.writeLoop()
     	return pconn, nil
     }
    

    新しいコネクションが確立される際に、writech が初期化され、writeLoop ゴルーチンが起動されます。

  3. readLoopwritech をクローズ:

    --- a/src/pkg/net/http/transport.go
    +++ b/src/pkg/net/http/transport.go
    @@ -519,6 +522,7 @@ func remoteSideClosed(err error) bool {
     }
     
     func (pc *persistConn) readLoop() {
    +	defer close(pc.writech)
     	alive := true
     	var lastbody io.ReadCloser // last response body, if any, read on this connection
    

    readLoop が終了する際に writech をクローズすることで、writeLoop が適切に終了できるようにします。

  4. writeLoop ゴルーチンの追加:

    --- a/src/pkg/net/http/transport.go
    +++ b/src/pkg/net/http/transport.go
    @@ -615,6 +619,23 @@ func (pc *persistConn) readLoop() {
     	}
     }
     
    +func (pc *persistConn) writeLoop() {
    +	for wr := range pc.writech { // (1) writech から書き込み要求を読み取る
    +		if pc.isBroken() { // (2) コネクションが壊れていないかチェック
    +			wr.ch <- errors.New("http: can't write HTTP request on broken connection")
    +			continue
    +		}
    +		err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra) // (3) リクエストをバッファに書き込む
    +		if err == nil {
    +			err = pc.bw.Flush() // (4) バッファをフラッシュし、物理コネクションに送信
    +		}
    +		if err != nil {
    +			pc.markBroken() // (5) エラーがあればコネクションを壊れた状態にする
    +		}
    +		wr.ch <- err // (6) 結果を roundTrip に返す
    +	}
    +}
    +
     type responseAndError struct {
      res *Response
      err error
    

    リクエストの書き込みとフラッシュを担当する新しいゴルーチン writeLoop が追加されました。

  5. writeRequest 構造体の追加:

    --- a/src/pkg/net/http/transport.go
    +++ b/src/pkg/net/http/transport.go
    @@ -630,6 +651,15 @@ type requestAndChan struct {
      addedGzip bool
     }
     
    +// A writeRequest is sent by the readLoop's goroutine to the
    +// writeLoop's goroutine to write a request while the read loop
    +// concurrently waits on both the write response and the server's
    +// reply.
    +type writeRequest struct {
    +	req *transportRequest
    +	ch  chan<- error
    +}
    +
     func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
      if pc.mutateHeaderFunc != nil {
       pc.mutateHeaderFunc(req.extraHeaders())
    

    writeLooproundTrip 間で書き込み要求をやり取りするための writeRequest 構造体が定義されました。

  6. roundTrip メソッドの並行処理ロジックの変更:

    --- a/src/pkg/net/http/transport.go
    +++ b/src/pkg/net/http/transport.go
    @@ -652,16 +682,29 @@ func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err err
      pc.numExpectedResponses++
      pc.lk.Unlock()
     
    -	err = req.Request.write(pc.bw, pc.isProxy, req.extra)
    -	if err != nil {
    -		pc.close()
    -		return
    +	// Write the request concurrently with waiting for a response,
    +	// in case the server decides to reply before reading our full
    +	// request body.
    +	writeErrCh := make(chan error, 1) // (1) 書き込みエラーを受け取るチャネル
    +	pc.writech <- writeRequest{req, writeErrCh} // (2) writeLoop に書き込み要求を送信
    +
    +	resc := make(chan responseAndError, 1) // (3) レスポンスを受け取るチャネル
    +	pc.reqch <- requestAndChan{req.Request, resc, requestedGzip} // (4) readLoop にレスポンス待機要求を送信
    +
    +	var re responseAndError
    +WaitResponse:
    +	for {
    +		select { // (5) writeErrCh と resc の両方を同時に待機
    +		case err := <-writeErrCh: // (6) 書き込みエラーが発生した場合
    +			if err != nil {
    +				re = responseAndError{nil, err}
    +				break WaitResponse
    +			}
    +		case re = <-resc: // (7) レスポンスが到着した場合
    +			break WaitResponse
    +		}
      }
    -	pc.bw.Flush()
     
    -	ch := make(chan responseAndError, 1)
    -	pc.reqch <- requestAndChan{req.Request, ch, requestedGzip}
    -	re := <-ch
      pc.lk.Lock()
      pc.numExpectedResponses--
      pc.lk.Unlock()
    

    リクエストの書き込みを writech に送信し、writeErrChresc の両方を select で同時に待機するようになりました。

  7. markBroken メソッドの追加:

    --- a/src/pkg/net/http/transport.go
    +++ b/src/pkg/net/http/transport.go
    @@ -669,6 +712,15 @@ func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err err
      return re.res, re.err
     }
     
    +// markBroken marks a connection as broken (so it's not reused).
    +// It differs from close in that it doesn't close the underlying
    +// connection for use when it's still being read.
    +func (pc *persistConn) markBroken() {
    +	pc.lk.Lock()
    +	defer pc.lk.Unlock()
    +	pc.broken = true
    +}
    +
     func (pc *persistConn) close() {
      pc.lk.Lock()
      defer pc.lk.Unlock()
    

    コネクションを「壊れた」状態としてマークするためのヘルパーメソッドが追加されました。

src/pkg/net/http/transport_test.go

  1. TestIssue3595 テストケースの追加:
    --- a/src/pkg/net/http/transport_test.go
    +++ b/src/pkg/net/http/transport_test.go
    @@ -833,6 +833,30 @@ func TestIssue3644(t *testing.T) {
      }
     }
     
    +// Test that a client receives a server's reply, even if the server doesn't read
    +// the entire request body.
    +func TestIssue3595(t *testing.T) {
    +	const deniedMsg = "sorry, denied."
    +	ts := httptest.NewServer(HandlerFunc(func(w ResponseWriter, r *Request) {
    +		Error(w, deniedMsg, StatusUnauthorized)
    +	}))
    +	defer ts.Close()
    +	tr := &Transport{}
    +	c := &Client{Transport: tr}
    +	res, err := c.Post(ts.URL, "application/octet-stream", neverEnding('a'))
    +	if err != nil {
    +		t.Errorf("Post: %v", err)
    +		return
    +	}
    +	got, err := ioutil.ReadAll(res.Body)
    +	if err != nil {
    +		t.Fatalf("Body ReadAll: %v", err)
    +	}
    +	if !strings.Contains(string(got), deniedMsg) {
    +		t.Errorf("Known bug: response %q does not contain %q", got, deniedMsg)
    +	}
    +}
    +
     type fooProto struct{}
     
     func (fooProto) RoundTrip(req *Request) (*Response, error) {
    
    このテストケースは、サーバーがリクエストボディを完全に読み取る前にレスポンスを返した場合でも、クライアントがそのレスポンスを正しく受け取れることを検証します。特に、neverEnding('a') という無限のボディを送信し、サーバーが 401 Unauthorized を返すシナリオをシミュレートしています。

これらの変更は、net/http クライアントの堅牢性と信頼性を大幅に向上させるものです。

コアとなるコードの解説

このコミットのコアとなる変更は、net/http/transport.go 内の persistConn 構造体と、それに関連する roundTripreadLoop、そして新しく追加された writeLoop ゴルーチンの協調動作にあります。

persistConn 構造体

  • writech chan writeRequest: この新しいチャネルは、roundTrip メソッドから writeLoop ゴルーチンへのリクエスト書き込み要求を伝達するために使用されます。バッファリングされたチャネル(容量50)であるため、ある程度の要求をキューに入れることができます。

writeRequest 構造体

  • req *transportRequest: 実際に書き込むHTTPリクエストの情報を保持します。
  • ch chan<- error: 書き込み操作の結果(エラーまたはnil)を roundTrip メソッドにフィードバックするためのチャネルです。これにより、書き込みの成功/失敗を roundTrip が知ることができます。

getConn 関数

  • persistConn のインスタンスが作成される際に、writech が初期化され、go pconn.writeLoop() が呼び出されます。これにより、コネクションごとに専用の writeLoop ゴルーチンが起動され、リクエストの書き込みを非同期で処理する準備が整います。

writeLoop ゴルーチン

このゴルーチンは、リクエストボディの実際の書き込みを担当します。

func (pc *persistConn) writeLoop() {
	for wr := range pc.writech { // (1) writech から書き込み要求を読み取る
		if pc.isBroken() { // (2) コネクションが壊れていないかチェック
			wr.ch <- errors.New("http: can't write HTTP request on broken connection")
			continue
		}
		err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra) // (3) リクエストをバッファに書き込む
		if err == nil {
			err = pc.bw.Flush() // (4) バッファをフラッシュし、物理コネクションに送信
		}
		if err != nil {
			pc.markBroken() // (5) エラーがあればコネクションを壊れた状態にする
		}
		wr.ch <- err // (6) 結果を roundTrip に返す
	}
}
  1. for wr := range pc.writech: writech から writeRequest を受け取るまでブロックします。チャネルがクローズされるとループを終了します。
  2. if pc.isBroken(): コネクションがすでに壊れている場合、書き込みは行わず、エラーを返します。
  3. wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra): HTTPリクエストのヘッダーとボディを bufio.Writer (pc.bw) に書き込みます。この時点ではまだ物理的なネットワークには送信されません。
  4. err = pc.bw.Flush(): pc.bw にバッファリングされたデータを物理的なネットワークコネクションに送信します。ここで実際にデータがサーバーに送られます。
  5. if err != nil { pc.markBroken() }: 書き込みまたはフラッシュ中にエラーが発生した場合、そのコネクションを「壊れた」状態としてマークします。これにより、このコネクションが将来のHTTPリクエストに再利用されるのを防ぎます。
  6. wr.ch <- err: 書き込み操作の結果(エラーまたはnil)を、writeRequest に含まれるチャネルを通じて roundTrip ゴルーチンに返します。

roundTrip メソッド

このメソッドは、単一のHTTPリクエストを送信し、レスポンスを受け取るクライアント側の主要なロジックを含んでいます。

func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
	// ... (前略) ...

	// Write the request concurrently with waiting for a response,
	// in case the server decides to reply before reading our full
	// request body.
	writeErrCh := make(chan error, 1) // (1) 書き込みエラーを受け取るチャネル
	pc.writech <- writeRequest{req, writeErrCh} // (2) writeLoop に書き込み要求を送信

	resc := make(chan responseAndError, 1) // (3) レスポンスを受け取るチャネル
	pc.reqch <- requestAndChan{req.Request, resc, requestedGzip} // (4) readLoop にレスポンス待機要求を送信

	var re responseAndError
WaitResponse:
	for {
		select { // (5) writeErrCh と resc の両方を同時に待機
		case err := <-writeErrCh: // (6) 書き込みエラーが発生した場合
			if err != nil {
				re = responseAndError{nil, err}
				break WaitResponse
			}
		case re = <-resc: // (7) レスポンスが到着した場合
			break WaitResponse
		}
	}

	// ... (後略) ...
	return re.res, re.err
}
  1. writeErrCh := make(chan error, 1): writeLoop から書き込みエラーを受け取るためのチャネルを作成します。バッファリングされているため、writeLooproundTrip がチャネルを読み取るのを待たずにエラーを送信できます。
  2. pc.writech <- writeRequest{req, writeErrCh}: writeRequest を作成し、pc.writech に送信します。これにより、writeLoop ゴルーチンがこのリクエストの書き込みを開始します。この操作は非同期で行われます。
  3. resc := make(chan responseAndError, 1): readLoop からサーバーレスポンスを受け取るためのチャネルを作成します。
  4. pc.reqch <- requestAndChan{req.Request, resc, requestedGzip}: readLoop ゴルーチンに、このリクエストに対するレスポンスを待機するよう指示します。
  5. select { ... }: ここがこの変更の最も重要な部分です。roundTrip は、writeErrChresc の両方からのイベントを同時に待機します。
  6. case err := <-writeErrCh:: もし writeLoop がリクエストの書き込み中にエラー(例: "broken pipe")を検出して writeErrCh に送信した場合、このケースが実行されます。roundTrip はそのエラーを即座に処理し、レスポンスの待機を中断してエラーを返します。
  7. case re = <-resc:: もし readLoop がサーバーからのレスポンスを先に受け取って resc に送信した場合、このケースが実行されます。roundTrip はレスポンスを処理し、書き込みエラーの発生を待たずにレスポンスを返します。

markBroken メソッド

  • pc.broken = true: コネクションが再利用できない状態であることを示すフラグを設定します。
  • このメソッドは、基盤となるTCPコネクションをすぐに閉じません。これは、readLoop がまだそのコネクションから残りのデータを読み取っている可能性があるためです。コネクションの実際のクローズは、readLoop が終了するか、または他のメカニズムによって行われます。

これらの変更により、net/http クライアントは、リクエストの書き込みとレスポンスの読み込みを真に並行して実行できるようになり、サーバーからの早期レスポンスを適切に処理し、より堅牢な通信を実現します。

関連リンク

参考にした情報源リンク

  • Go言語の公式ドキュメント: net/http パッケージ
  • Go言語の並行処理に関するドキュメント(Goroutines, Channels, Select)
  • HTTP/1.1 プロトコル仕様 (RFC 2616 または RFC 7230-7235)
  • TCP/IP プロトコルに関する一般的な知識
  • bufio パッケージのドキュメント
  • io パッケージのドキュメント
  • Go言語のテストフレームワーク testing パッケージのドキュメント
  • httptest パッケージのドキュメント
  • Web検索結果: "Go net/http Issue 3595" (このコミットが修正した問題に関連する議論や、rstAvoidanceDelay などの関連概念について言及されている可能性があります。)

[インデックス 13364] ファイルの概要

このコミットは、Go言語の標準ライブラリ net/http パッケージにおけるクライアントの挙動を改善するものです。具体的には、HTTPリクエストのボディを送信している最中にサーバーからレスポンスが返された場合(例えば、認証エラーなど)、以前はクライアントがそのレスポンスを適切に処理できず、書き込みエラー("broken pipe"など)として扱ってしまう問題がありました。このコミットにより、リクエストの書き込みとレスポンスの読み込みが並行して行われるようになり、このようなシナリオでもサーバーからのレスポンスを正確に受け取れるようになりました。

コミット

commit a5aa91b9a2a013be9ced9ae05474e1dce2fe16d4
Author: Brad Fitzpatrick <bradfitz@golang.org>
Date:   Tue Jun 19 09:20:41 2012 -0700

    net/http: make client await response concurrently with writing request
    
    If the server replies with an HTTP response before we're done
    writing our body (for instance "401 Unauthorized" response), we
    were previously ignoring that, since we returned our write
    error ("broken pipe", etc) before ever reading the response.
    Now we read and write at the same time.
    
    Fixes #3595
    
    R=rsc, adg
    CC=golang-dev
    https://golang.org/cl/6238043

GitHub上でのコミットページへのリンク

https://github.com/golang/go/commit/a5aa91b9a2a013be9ced9ae05474e1dce2fe16d4

元コミット内容

net/http: クライアントがリクエストの書き込みと並行してレスポンスを待機するようにする

もしサーバーがリクエストボディの書き込みが完了する前にHTTPレスポンスを返した場合(例えば「401 Unauthorized」レスポンスなど)、以前はクライアントがそれを無視していました。なぜなら、レスポンスを読み込む前に書き込みエラー(「broken pipe」など)を返していたからです。 これで、読み込みと書き込みが同時に行われるようになります。

Fixes #3595

R=rsc, adg CC=golang-dev https://golang.org/cl/6238043

変更の背景

この変更の背景には、HTTPクライアントがリクエストボディをサーバーに送信している途中で、サーバーが早期にレスポンスを返すという特定のシナリオにおける問題がありました。

従来の net/http クライアントの実装では、HTTPリクエストの送信(特にボディの書き込み)とサーバーからのレスポンスの受信が、ある程度シーケンシャルに行われていました。つまり、クライアントはまずリクエスト全体を書き込もうとし、その書き込みが完了するか、あるいはエラーが発生するまで、サーバーからのレスポンスを待機しませんでした。

このシーケンシャルな挙動が問題となるのは、以下のようなケースです。

  1. 早期レスポンス: サーバーがリクエストボディの全てを受け取る前に、何らかの理由でレスポンスを返す場合。例えば、認証が必要なエンドポイントに対して認証情報なしで大きなボディをPOSTした場合、サーバーはボディの受信を待たずに「401 Unauthorized」のようなエラーレスポンスをすぐに返すことがあります。
  2. 「broken pipe」問題: 従来のクライアントは、リクエストボディの書き込み中にサーバーが接続を切断した場合(早期レスポンスを返した後など)、write システムコールが「broken pipe」エラーを返していました。クライアントはこの書き込みエラーを即座に処理し、レスポンスを読み込むことなくエラーを返していました。
  3. 真のレスポンスの喪失: 結果として、クライアントはサーバーが意図した「401 Unauthorized」のような意味のあるレスポンスを受け取ることができず、単なる書き込みエラーとして処理してしまい、問題の根本原因を特定するのが困難でした。

このコミットは、このような状況下でもクライアントがサーバーからの早期レスポンスを適切に受け取り、処理できるようにするために導入されました。これにより、クライアントはより堅牢になり、サーバーの挙動に柔軟に対応できるようになります。

前提知識の解説

このコミットを理解するためには、以下の技術的知識が役立ちます。

  1. HTTPプロトコル:

    • リクエスト/レスポンスモデル: クライアントがリクエストを送信し、サーバーがレスポンスを返すという基本的なHTTPの通信モデル。
    • リクエストボディ: POSTやPUTなどのメソッドで、クライアントがサーバーに送信するデータ本体。
    • レスポンスステータスコード: サーバーがリクエストの結果を示す3桁の数字(例: 200 OK, 401 Unauthorized, 500 Internal Server Error)。
    • TCPソケット: HTTP通信の基盤となるトランスポート層のプロトコル。データはストリームとして送受信されます。
  2. Go言語の並行処理:

    • Goroutine (ゴルーチン): Go言語における軽量なスレッドのようなもの。非常に低コストで多数生成でき、並行処理を実現します。
    • Channel (チャネル): Goroutine間で安全にデータを送受信するための通信メカニズム。チャネルを介した通信は同期的に行われるため、データ競合を防ぎます。
    • select ステートメント: 複数のチャネル操作を同時に待機し、準備ができたチャネル操作を実行するためのGoの構文。非同期処理の調整に不可欠です。
  3. net/http パッケージ:

    • Transport: net/http パッケージにおいて、実際のHTTPリクエストの送信とレスポンスの受信を担当するインターフェース。コネクションの管理、リクエストの多重化、プロキシの処理などを行います。
    • persistConn: Transport の内部で、単一のTCPコネクションを介して複数のHTTPリクエスト/レスポンスを処理するための構造体。HTTP/1.1のKeep-Alive機能を実現するために使用されます。
    • readLoop: persistConn 内で、サーバーからのレスポンスを継続的に読み込むためのGoroutine。
    • roundTrip: Transport インターフェースの主要なメソッドで、単一のHTTPリクエストを送信し、対応するレスポンスを受け取る一連の処理をカプセル化します。
  4. I/O操作とバッファリング:

    • io.Reader / io.Writer: Go言語における基本的なI/Oインターフェース。データの読み書き操作を抽象化します。
    • bufio.Reader / bufio.Writer: バッファリングされたI/Oを提供する構造体。これにより、小さなI/O操作が多数発生するのを防ぎ、効率を向上させます。Flush() メソッドはバッファの内容を実際に書き込み先に送るために重要です。
    • 「broken pipe」エラー: パイプやソケットの書き込み側が、読み込み側がすでにクローズされている状態で書き込みを試みた場合に発生するエラー。TCPソケット通信では、サーバーが接続を閉じた後にクライアントが書き込みを続けると発生します。

これらの概念を理解することで、コミットがどのようにして既存の課題を解決し、net/http クライアントの堅牢性を向上させたのかを深く把握できます。

技術的詳細

このコミットの核心は、HTTPクライアントがリクエストの書き込みとレスポンスの読み込みを並行して行うための新しいメカニズムを導入した点にあります。これにより、サーバーがリクエストボディの受信完了を待たずに早期にレスポンスを返した場合でも、クライアントがそのレスポンスを適切に処理できるようになります。

具体的な技術的変更点は以下の通りです。

  1. writech チャネルの導入:

    • persistConn 構造体に writech chan writeRequest という新しいチャネルが追加されました。このチャネルは、roundTrip ゴルーチンからリクエストの書き込み要求を受け取り、writeLoop ゴルーチンに渡す役割を担います。
    • writeRequest は、書き込むリクエスト (*transportRequest) と、書き込み結果を返すためのチャネル (chan<- error) を含む新しい構造体です。
  2. writeLoop ゴルーチンの導入:

    • persistConn の初期化時(getConn メソッド内)に、go pconn.writeLoop() という新しいゴルーチンが起動されるようになりました。
    • この writeLoop ゴルーチンは、pc.writech から writeRequest を継続的に読み取ります。
    • writeRequest を受け取ると、その中のリクエスト (wr.req.Request) を pc.bw (バッファリングされたライター) を使って実際にコネクションに書き込みます。
    • 書き込みが完了したら、pc.bw.Flush() を呼び出してバッファの内容を物理的なコネクションに送信します。
    • 書き込み中にエラーが発生した場合(例: サーバーが接続を閉じたことによる「broken pipe」)、pc.markBroken() を呼び出してコネクションを「壊れた」状態としてマークし、そのエラーを writeRequest に含まれるチャネル (wr.ch) を通じて roundTrip ゴルーチンに返します。
  3. roundTrip メソッドの変更:

    • 以前は roundTrip メソッド内でリクエストの書き込みとフラッシュを直接行い、その後レスポンスを待機していました。
    • 変更後、リクエストの書き込みは writeLoop ゴルーチンに委譲されるようになりました。具体的には、writeRequest を作成し、pc.writech に送信します。
    • 最も重要な変更は、select ステートメントの導入です。roundTrip は、writeErrCh (書き込みエラーを受け取るチャネル) と resc (レスポンスを受け取るチャネル) の両方を同時に待機するようになりました。
    • これにより、リクエストの書き込みが完了する前にサーバーからレスポンスが返された場合でも、resc チャネルからのイベントが先に発生し、クライアントは早期レスポンスを即座に処理できるようになります。
    • もし書き込みエラーが先に発生した場合、そのエラーが優先され、レスポンスの待機は中断されます。
  4. markBroken メソッドの導入:

    • persistConnmarkBroken() という新しいメソッドが追加されました。これは、コネクションが再利用できない状態になったことをマークしますが、基盤となるコネクション自体はすぐに閉じません。これは、readLoop がまだそのコネクションからデータを読み取っている可能性があるためです。

これらの変更により、net/http クライアントは、リクエストの書き込みとレスポンスの読み込みという2つの独立したタスクを並行して実行できるようになり、サーバーからの早期レスポンスを適切に処理し、より堅牢な通信を実現します。

コアとなるコードの変更箇所

このコミットによる主要なコード変更は、src/pkg/net/http/transport.gosrc/pkg/net/http/transport_test.go の2つのファイルに集中しています。

src/pkg/net/http/transport.go

  1. persistConn 構造体への writech の追加:

    --- a/src/pkg/net/http/transport.go
    +++ b/src/pkg/net/http/transport.go
    @@ -487,7 +489,8 @@ type persistConn struct {
     	closed   bool                // whether conn has been closed
     	br       *bufio.Reader       // from conn
     	bw       *bufio.Writer       // to conn
    -	reqch    chan requestAndChan // written by roundTrip(); read by readLoop()
    +	reqch    chan requestAndChan // written by roundTrip; read by readLoop
    +	writech  chan writeRequest   // written by roundTrip; read by writeLoop
     	isProxy  bool
    

    persistConnwritech チャネルが追加され、リクエストの書き込み要求を writeLoop に送るために使用されます。

  2. getConnwritech の初期化と writeLoop の起動:

    --- a/src/pkg/net/http/transport.go
    +++ b/src/pkg/net/http/transport.go
    @@ -323,6 +323,7 @@ func (t *Transport) getConn(cm *connectMethod) (*persistConn, error) {
     		cacheKey: cm.String(),
     		conn:     conn,
     		reqch:    make(chan requestAndChan, 50),\n
    +		writech:  make(chan writeRequest, 50),
     	}
     
     	switch {
    @@ -380,6 +381,7 @@ func (t *Transport) getConn(cm *connectMethod) (*persistConn, error) {
     	pconn.br = bufio.NewReader(pconn.conn)
     	pconn.bw = bufio.NewWriter(pconn.conn)
     	go pconn.readLoop()
    +	go pconn.writeLoop()
     	return pconn, nil
     }
    

    新しいコネクションが確立される際に、writech が初期化され、writeLoop ゴルーチンが起動されます。

  3. readLoopwritech をクローズ:

    --- a/src/pkg/net/http/transport.go
    +++ b/src/pkg/net/http/transport.go
    @@ -519,6 +522,7 @@ func remoteSideClosed(err error) bool {
     }
     
     func (pc *persistConn) readLoop() {
    +	defer close(pc.writech)
     	alive := true
     	var lastbody io.ReadCloser // last response body, if any, read on this connection
    

    readLoop が終了する際に writech をクローズすることで、writeLoop が適切に終了できるようにします。

  4. writeLoop ゴルーチンの追加:

    --- a/src/pkg/net/http/transport.go
    +++ b/src/pkg/net/http/transport.go
    @@ -615,6 +619,23 @@ func (pc *persistConn) readLoop() {
     	}
     }
     
    +func (pc *persistConn) writeLoop() {
    +	for wr := range pc.writech { // (1) writech から書き込み要求を読み取る
    +		if pc.isBroken() { // (2) コネクションが壊れていないかチェック
    +			wr.ch <- errors.New("http: can't write HTTP request on broken connection")
    +			continue
    +		}
    +		err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra) // (3) リクエストをバッファに書き込む
    +		if err == nil {
    +			err = pc.bw.Flush() // (4) バッファをフラッシュし、物理コネクションに送信
    +		}
    +		if err != nil {
    +			pc.markBroken() // (5) エラーがあればコネクションを壊れた状態にする
    +		}
    +		wr.ch <- err // (6) 結果を roundTrip に返す
    +	}
    +}
    +
     type responseAndError struct {
      res *Response
      err error
    

    リクエストの書き込みとフラッシュを担当する新しいゴルーチン writeLoop が追加されました。

  5. writeRequest 構造体の追加:

    --- a/src/pkg/net/http/transport.go
    +++ b/src/pkg/net/http/transport.go
    @@ -630,6 +651,15 @@ type requestAndChan struct {
      addedGzip bool
     }
     
    +// A writeRequest is sent by the readLoop's goroutine to the
    +// writeLoop's goroutine to write a request while the read loop
    +// concurrently waits on both the write response and the server's
    +// reply.
    +type writeRequest struct {
    +	req *transportRequest
    +	ch  chan<- error
    +}
    +
     func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
      if pc.mutateHeaderFunc != nil {
       pc.mutateHeaderFunc(req.extraHeaders())
    

    writeLooproundTrip 間で書き込み要求をやり取りするための writeRequest 構造体が定義されました。

  6. roundTrip メソッドの並行処理ロジックの変更:

    --- a/src/pkg/net/http/transport.go
    +++ b/src/pkg/net/http/transport.go
    @@ -652,16 +682,29 @@ func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err err
      pc.numExpectedResponses++
      pc.lk.Unlock()
     
    -	err = req.Request.write(pc.bw, pc.isProxy, req.extra)
    -	if err != nil {
    -		pc.close()
    -		return
    +	// Write the request concurrently with waiting for a response,
    +	// in case the server decides to reply before reading our full
    +	// request body.
    +	writeErrCh := make(chan error, 1) // (1) 書き込みエラーを受け取るチャネル
    +	pc.writech <- writeRequest{req, writeErrCh} // (2) writeLoop に書き込み要求を送信
    +
    +	resc := make(chan responseAndError, 1) // (3) レスポンスを受け取るチャネル
    +	pc.reqch <- requestAndChan{req.Request, resc, requestedGzip} // (4) readLoop にレスポンス待機要求を送信
    +
    +	var re responseAndError
    +WaitResponse:
    +	for {
    +		select { // (5) writeErrCh と resc の両方を同時に待機
    +		case err := <-writeErrCh: // (6) 書き込みエラーが発生した場合
    +			if err != nil {
    +				re = responseAndError{nil, err}
    +				break WaitResponse
    +			}
    +		case re = <-resc: // (7) レスポンスが到着した場合
    +			break WaitResponse
    +		}
      }
    -	pc.bw.Flush()
     
    -	ch := make(chan responseAndError, 1)
    -	pc.reqch <- requestAndChan{req.Request, ch, requestedGzip}
    -	re := <-ch
      pc.lk.Lock()
      pc.numExpectedResponses--
      pc.lk.Unlock()
    

    リクエストの書き込みを writech に送信し、writeErrChresc の両方を select で同時に待機するようになりました。

  7. markBroken メソッドの追加:

    --- a/src/pkg/net/http/transport.go
    +++ b/src/pkg/net/http/transport.go
    @@ -669,6 +712,15 @@ func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err err
      return re.res, re.err
     }
     
    +// markBroken marks a connection as broken (so it's not reused).
    +// It differs from close in that it doesn't close the underlying
    +// connection for use when it's still being read.
    +func (pc *persistConn) markBroken() {
    +	pc.lk.Lock()
    +	defer pc.lk.Unlock()
    +	pc.broken = true
    +}
    +
     func (pc *persistConn) close() {
      pc.lk.Lock()
      defer pc.lk.Unlock()
    

    コネクションを「壊れた」状態としてマークするためのヘルパーメソッドが追加されました。

src/pkg/net/http/transport_test.go

  1. TestIssue3595 テストケースの追加:
    --- a/src/pkg/net/http/transport_test.go
    +++ b/src/pkg/net/http/transport_test.go
    @@ -833,6 +833,30 @@ func TestIssue3644(t *testing.T) {
      }
     }
     
    +// Test that a client receives a server's reply, even if the server doesn't read
    +// the entire request body.
    +func TestIssue3595(t *testing.T) {
    +	const deniedMsg = "sorry, denied."
    +	ts := httptest.NewServer(HandlerFunc(func(w ResponseWriter, r *Request) {
    +		Error(w, deniedMsg, StatusUnauthorized)
    +	}))
    +	defer ts.Close()
    +	tr := &Transport{}
    +	c := &Client{Transport: tr}
    +	res, err := c.Post(ts.URL, "application/octet-stream", neverEnding('a'))
    +	if err != nil {
    +		t.Errorf("Post: %v", err)
    +		return
    +	}
    +	got, err := ioutil.ReadAll(res.Body)
    +	if err != nil {
    +		t.Fatalf("Body ReadAll: %v", err)
    +	}
    +	if !strings.Contains(string(got), deniedMsg) {
    +		t.Errorf("Known bug: response %q does not contain %q", got, deniedMsg)
    +	}
    +}
    +
     type fooProto struct{}
     
     func (fooProto) RoundTrip(req *Request) (*Response, error) {
    
    このテストケースは、サーバーがリクエストボディを完全に読み取る前にレスポンスを返した場合でも、クライアントがそのレスポンスを正しく受け取れることを検証します。特に、neverEnding('a') という無限のボディを送信し、サーバーが 401 Unauthorized を返すシナリオをシミュレートしています。

これらの変更は、net/http クライアントの堅牢性と信頼性を大幅に向上させるものです。

コアとなるコードの解説

このコミットのコアとなる変更は、net/http/transport.go 内の persistConn 構造体と、それに関連する roundTripreadLoop、そして新しく追加された writeLoop ゴルーチンの協調動作にあります。

persistConn 構造体

  • writech chan writeRequest: この新しいチャネルは、roundTrip メソッドから writeLoop ゴルーチンへのリクエスト書き込み要求を伝達するために使用されます。バッファリングされたチャネル(容量50)であるため、ある程度の要求をキューに入れることができます。

writeRequest 構造体

  • req *transportRequest: 実際に書き込むHTTPリクエストの情報を保持します。
  • ch chan<- error: 書き込み操作の結果(エラーまたはnil)を roundTrip メソッドにフィードバックするためのチャネルです。これにより、書き込みの成功/失敗を roundTrip が知ることができます。

getConn 関数

  • persistConn のインスタンスが作成される際に、writech が初期化され、go pconn.writeLoop() が呼び出されます。これにより、コネクションごとに専用の writeLoop ゴルーチンが起動され、リクエストの書き込みを非同期で処理する準備が整います。

writeLoop ゴルーチン

このゴルーチンは、リクエストボディの実際の書き込みを担当します。

func (pc *persistConn) writeLoop() {
	for wr := range pc.writech { // (1) writech から書き込み要求を読み取る
		if pc.isBroken() { // (2) コネクションが壊れていないかチェック
			wr.ch <- errors.New("http: can't write HTTP request on broken connection")
			continue
		}
		err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra) // (3) リクエストをバッファに書き込む
		if err == nil {
			err = pc.bw.Flush() // (4) バッファをフラッシュし、物理コネクションに送信
		}
		if err != nil {
			pc.markBroken() // (5) エラーがあればコネクションを壊れた状態にする
		}
		wr.ch <- err // (6) 結果を roundTrip に返す
	}
}
  1. for wr := range pc.writech: writech から writeRequest を受け取るまでブロックします。チャネルがクローズされるとループを終了します。
  2. if pc.isBroken(): コネクションがすでに壊れている場合、書き込みは行わず、エラーを返します。
  3. wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra): HTTPリクエストのヘッダーとボディを bufio.Writer (pc.bw) に書き込みます。この時点ではまだ物理的なネットワークには送信されません。
  4. err = pc.bw.Flush(): pc.bw にバッファリングされたデータを物理的なネットワークコネクションに送信します。ここで実際にデータがサーバーに送られます。
  5. if err != nil { pc.markBroken() }: 書き込みまたはフラッシュ中にエラーが発生した場合、そのコネクションを「壊れた」状態としてマークします。これにより、このコネクションが将来のHTTPリクエストに再利用されるのを防ぎます。
  6. wr.ch <- err: 書き込み操作の結果(エラーまたはnil)を、writeRequest に含まれるチャネルを通じて roundTrip ゴルーチンに返します。

roundTrip メソッド

このメソッドは、単一のHTTPリクエストを送信し、レスポンスを受け取るクライアント側の主要なロジックを含んでいます。

func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
	// ... (前略) ...

	// Write the request concurrently with waiting for a response,
	// in case the server decides to reply before reading our full
	// request body.
	writeErrCh := make(chan error, 1) // (1) 書き込みエラーを受け取るチャネル
	pc.writech <- writeRequest{req, writeErrCh} // (2) writeLoop に書き込み要求を送信

	resc := make(chan responseAndError, 1) // (3) レスポンスを受け取るチャネル
	pc.reqch <- requestAndChan{req.Request, resc, requestedGzip} // (4) readLoop にレスポンス待機要求を送信

	var re responseAndError
WaitResponse:
	for {
		select { // (5) writeErrCh と resc の両方を同時に待機
		case err := <-writeErrCh: // (6) 書き込みエラーが発生した場合
			if err != nil {
				re = responseAndError{nil, err}
				break WaitResponse
			}
		case re = <-resc: // (7) レスポンスが到着した場合
			break WaitResponse
		}
	}

	// ... (後略) ...
	return re.res, re.err
}
  1. writeErrCh := make(chan error, 1): writeLoop から書き込みエラーを受け取るためのチャネルを作成します。バッファリングされているため、writeLooproundTrip がチャネルを読み取るのを待たずにエラーを送信できます。
  2. pc.writech <- writeRequest{req, writeErrCh}: writeRequest を作成し、pc.writech に送信します。これにより、writeLoop ゴルーチンがこのリクエストの書き込みを開始します。この操作は非同期で行われます。
  3. resc := make(chan responseAndError, 1): readLoop からサーバーレスポンスを受け取るためのチャネルを作成します。
  4. pc.reqch <- requestAndChan{req.Request, resc, requestedGzip}: readLoop ゴルーチンに、このリクエストに対するレスポンスを待機するよう指示します。
  5. select { ... }: ここがこの変更の最も重要な部分です。roundTrip は、writeErrChresc の両方を同時に待機します。
  6. case err := <-writeErrCh:: もし writeLoop がリクエストの書き込み中にエラー(例: "broken pipe")を検出して writeErrCh に送信した場合、このケースが実行されます。roundTrip はそのエラーを即座に処理し、レスポンスの待機を中断してエラーを返します。
  7. case re = <-resc:: もし readLoop がサーバーからのレスポンスを先に受け取って resc に送信した場合、このケースが実行されます。roundTrip はレスポンスを処理し、書き込みエラーの発生を待たずにレスポンスを返します。

markBroken メソッド

  • pc.broken = true: コネクションが再利用できない状態であることを示すフラグを設定します。
  • このメソッドは、基盤となるTCPコネクションをすぐに閉じません。これは、readLoop がまだそのコネクションから残りのデータを読み取っている可能性があるためです。コネクションの実際のクローズは、readLoop が終了するか、または他のメカニズムによって行われます。

これらの変更により、net/http クライアントは、リクエストの書き込みとレスポンスの読み込みを真に並行して実行できるようになり、サーバーからの早期レスポンスを適切に処理し、より堅牢な通信を実現します。

関連リンク

参考にした情報源リンク

  • Go言語の公式ドキュメント: net/http パッケージ
  • Go言語の並行処理に関するドキュメント(Goroutines, Channels, Select)
  • HTTP/1.1 プロトコル仕様 (RFC 2616 または RFC 7230-7235)
  • TCP/IP プロトコルに関する一般的な知識
  • bufio パッケージのドキュメント
  • io パッケージのドキュメント
  • Go言語のテストフレームワーク testing パッケージのドキュメント
  • httptest パッケージのドキュメント
  • Web検索結果: "Go net/http Issue 3595" (このコミットが修正した問題に関連する議論や、rstAvoidanceDelay などの関連概念について言及されている可能性があります。)