Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

[インデックス 11435] ファイルの概要

このコミットは、Go言語の net/rpc パッケージにおけるクライアントの実装に関する複数の問題を修正するものです。具体的には、Call.Error におけるデータ競合の解消、Done チャネルへの複数回送信の可能性の排除、一時的な Write エラー発生時のメモリリークの修正、Client.shutdown および Client.closing におけるデータ競合の修正、そしてコメントの修正が含まれます。これらの修正は、RPCクライアントの堅牢性と信頼性を向上させることを目的としています。

コミット

commit 75397e65ee29e38ec89db58885fa6cf6e52ca558
Author: Dmitriy Vyukov <dvyukov@google.com>
Date:   Fri Jan 27 11:27:05 2012 +0400

    net/rpc: fix data race on Call.Error
    +eliminates a possibility of sending a call to Done several times.
    +fixes memory leak in case of temporal Write errors.
    +fixes data race on Client.shutdown.
    +fixes data race on Client.closing.
    +fixes comments.
    Fixes #2780.
    
    R=r, rsc
    CC=golang-dev, mpimenov
    https://golang.org/cl/5571063

GitHub上でのコミットページへのリンク

https://github.com/golang/go/commit/75397e65ee29e38ec89db58885fa6cf6e52ca558

元コミット内容

net/rpc: fix data race on Call.Error
+eliminates a possibility of sending a call to Done several times.
+fixes memory leak in case of temporal Write errors.
+fixes data race on Client.shutdown.
+fixes data race on Client.closing.
+fixes comments.
Fixes #2780.

R=r, rsc
CC=golang-dev, mpimenov
https://golang.org/cl/5571063

変更の背景

このコミットは、Go言語の net/rpc パッケージのクライアント実装における複数の深刻なバグに対処するために行われました。主な背景は以下の通りです。

  1. Call.Error のデータ競合: Call 構造体の Error フィールドは、RPC呼び出しの完了後にエラー状態を保持します。複数のゴルーチンが同時にこのフィールドにアクセスし、書き込みを行う可能性があるため、データ競合が発生していました。これは予測不能な動作やクラッシュを引き起こす可能性があります。
  2. Done チャネルへの複数回送信: Call.Done チャネルは、RPC呼び出しが完了したことを通知するために使用されます。しかし、特定の条件下で、同じ Call オブジェクトが Done チャネルに複数回送信される可能性がありました。これは、チャネルのセマンティクスに反し、受信側で予期せぬ動作を引き起こす可能性があります。
  3. 一時的な Write エラー時のメモリリーク: クライアントがリクエストを送信する際に一時的な書き込みエラー(例: ネットワークの一時的な問題)が発生した場合、その Call オブジェクトが Client.pending マップから適切に削除されず、メモリリークが発生する可能性がありました。これにより、時間とともにメモリ使用量が増加し、システムのパフォーマンスに影響を与える可能性があります。
  4. Client.shutdown および Client.closing のデータ競合: クライアントのシャットダウン処理中に Client.shutdown および Client.closing フラグにアクセスする際に、複数のゴルーチン間でデータ競合が発生していました。これは、シャットダウン処理の信頼性を損ない、未定義の動作につながる可能性があります。
  5. コメントの修正: コードの可読性と理解を向上させるために、既存のコメントが修正されました。

これらの問題は、RPCクライアントの安定性と信頼性に直接影響を与えるため、早急な修正が必要でした。特にデータ競合はデバッグが困難であり、本番環境での予期せぬ障害につながる可能性があります。Fixes #2780 は、これらの問題がGoのIssueトラッカーで報告されていたことを示唆しています。

前提知識の解説

このコミットの変更内容を理解するためには、以下のGo言語の概念と net/rpc パッケージの基本的な知識が必要です。

Go言語の並行処理

  • ゴルーチン (Goroutines): Go言語における軽量なスレッドのようなものです。非常に少ないメモリで多数のゴルーチンを同時に実行できます。
  • チャネル (Channels): ゴルーチン間で値を送受信するための通信メカニズムです。チャネルは、ゴルーチン間の同期と通信を安全に行うための主要な手段です。
  • sync.Mutex: 相互排他ロックを提供し、共有リソースへのアクセスを同期するために使用されます。これにより、複数のゴルーチンが同時に同じデータに書き込むことによるデータ競合を防ぎます。Lock() でロックを取得し、Unlock() でロックを解放します。defer キーワードと組み合わせることで、関数の終了時に確実にロックが解放されるようにできます。
  • データ競合 (Data Race): 複数のゴルーチンが同時に同じメモリ位置にアクセスし、少なくとも1つのアクセスが書き込みであり、かつそれらのアクセスが同期されていない場合に発生します。データ競合は、予測不能な結果やプログラムのクラッシュを引き起こす可能性があります。

net/rpc パッケージ

net/rpc パッケージは、Go言語でRPC (Remote Procedure Call) クライアントとサーバーを実装するための標準ライブラリです。これにより、異なるプロセスやネットワーク上のマシン間で関数呼び出しを行うことができます。

  • rpc.Client: RPCサーバーへの接続を表すクライアントオブジェクトです。
  • rpc.Call: 単一のRPC呼び出しを表す構造体です。
    • ServiceMethod: 呼び出すサービスとメソッドの名前。
    • Args: メソッドに渡す引数。
    • Reply: メソッドからの戻り値を格納するポインタ。
    • Error: 呼び出しが完了した後のエラー状態。
    • Done: 呼び出しが完了したときに値が送信されるチャネル。
  • Client.Call(): 同期的にRPC呼び出しを実行し、結果が返されるまでブロックします。
  • Client.Go(): 非同期的にRPC呼び出しを開始し、*Call オブジェクトをすぐに返します。呼び出しの完了は Call.Done チャネルで通知されます。
  • ClientCodec インターフェース: RPCメッセージのエンコードとデコードを行うためのインターフェースです。これにより、JSON-RPCやGob-RPCなど、さまざまなプロトコルをサポートできます。
  • client.send() メソッド: クライアントがRPCリクエストをエンコードし、サーバーに送信する内部メソッドです。
  • client.input() メソッド: クライアントがサーバーからのRPCレスポンスを読み取り、対応する Call オブジェクトを処理する内部メソッドです。

その他のGo言語の概念

  • io.EOF: ファイルの終端 (End Of File) を示すエラーです。ネットワーク接続が正常に閉じられた場合など、ストリームの終端に達したことを示します。
  • log.Println(): 標準エラー出力にログメッセージを出力するための関数です。

これらの概念を理解することで、コミットで行われた変更がなぜ必要であり、どのように機能するのかを深く把握することができます。

技術的詳細

このコミットは、src/pkg/net/rpc/client.go ファイルに対して行われ、主に Client 構造体と Call 構造体、およびそれらに関連するメソッドの並行処理に関する問題を解決しています。

Call 構造体の変更

--- a/src/pkg/net/rpc/client.go
+++ b/src/pkg/net/rpc/client.go
@@ -31,8 +31,7 @@ type Call struct {
 	Args          interface{} // The argument to the function (*struct).
 	Reply         interface{} // The reply from the function (*struct).\
 	Error         error       // After completion, the error status.
-	Done          chan *Call  // Strobes when call is complete; value is the error status.
-	seq           uint64
+	Done          chan *Call  // Strobes when call is complete.
 }
  • Call.Done のコメントから「; value is the error status.」が削除されました。これは、Done チャネルが *Call オブジェクト自体を送信するだけであり、エラー状態は Call.Error フィールドで確認されるべきであることを明確にするためです。
  • Call.seq フィールドが Call 構造体から削除されました。これは、seqClient 内部で管理されるべき情報であり、Call オブジェクト自体が持つ必要がないためです。これにより、Call 構造体の責務がより明確になります。

Client.send() メソッドの変更

send メソッドは、RPCリクエストをサーバーに送信する役割を担います。このメソッドには、データ競合とメモリリークの問題がありました。

--- a/src/pkg/net/rpc/client.go
+++ b/src/pkg/net/rpc/client.go
@@ -65,28 +64,33 @@ type ClientCodec interface {
 	Close() error
 }\
 \
-func (client *Client) send(c *Call) {
+func (client *Client) send(call *Call) {
+\tclient.sending.Lock()
+\tdefer client.sending.Unlock()
+\
 	// Register this call.
 	client.mutex.Lock()
 	if client.shutdown {
-\t\tc.Error = ErrShutdown
+\t\tcall.Error = ErrShutdown
 	\tclient.mutex.Unlock()
-\t\tc.done()
+\t\tcall.done()
 	\treturn
 	}
-\tc.seq = client.seq
+\tseq := client.seq
 	client.seq++
-\tclient.pending[c.seq] = c
+\tclient.pending[seq] = call
 	client.mutex.Unlock()
 \
 	// Encode and send the request.
-\tclient.sending.Lock()
-\tdefer client.sending.Unlock()
-\tclient.request.Seq = c.seq
-\tclient.request.ServiceMethod = c.ServiceMethod
-\tif err := client.codec.WriteRequest(&client.request, c.Args); err != nil {
-\t\tc.Error = err
-\t\tc.done()
+\tclient.request.Seq = seq
+\tclient.request.ServiceMethod = call.ServiceMethod
+\terr := client.codec.WriteRequest(&client.request, call.Args)
+\tif err != nil {
+\t\tclient.mutex.Lock()
+\t\tdelete(client.pending, seq)
+\t\tclient.mutex.Unlock()
+\t\tcall.Error = err
+\t\tcall.done()
 \t}
 }
  • client.sending.Lock() の移動: 以前は client.request の書き込み直前にロックを取得していましたが、変更後は send メソッドの冒頭で client.sending.Lock() を取得し、defer で解放するように変更されました。これにより、client.request の準備から実際の書き込みまでの一連の処理が sending ミューテックスによって保護され、複数のゴルーチンが同時にリクエストを送信しようとした際のデータ競合が防止されます。
  • Call.seq のローカル変数化: c.seq を直接使用する代わりに、seq := client.seq としてローカル変数 seq にコピーし、この seqclient.pending マップのキーとして使用するように変更されました。これにより、Call 構造体から seq フィールドを削除することが可能になりました。
  • 書き込みエラー時のメモリリーク修正: client.codec.WriteRequest がエラーを返した場合、以前は c.Error = errc.done() を呼び出すだけでした。しかし、この場合 client.pending マップから Call オブジェクトが削除されないため、メモリリークが発生していました。修正後は、エラー発生時に client.mutex.Lock() を取得し、delete(client.pending, seq) を呼び出して Call オブジェクトをマップから明示的に削除するように変更されました。これにより、メモリリークが防止されます。

Client.input() メソッドの変更

input メソッドは、サーバーからのレスポンスを処理し、対応する Call オブジェクトを完了させる役割を担います。

--- a/src/pkg/net/rpc/client.go
+++ b/src/pkg/net/rpc/client.go
@@ -104,36 +108,39 @@ func (client *Client) input() {
 	\t\t}\
 	\t\tseq := response.Seq
 	\t\tclient.mutex.Lock()
-\t\t\tc := client.pending[seq]
+\t\t\tcall := client.pending[seq]
 	\t\tdelete(client.pending, seq)
 	\t\tclient.mutex.Unlock()
 \
 	\t\tif response.Error == "" {
-\t\t\t\terr = client.codec.ReadResponseBody(c.Reply)
+\t\t\t\terr = client.codec.ReadResponseBody(call.Reply)
 	\t\t\tif err != nil {
-\t\t\t\t\tc.Error = errors.New(\"reading body \" + err.Error())
+\t\t\t\t\tcall.Error = errors.New(\"reading body \" + err.Error())
 	\t\t\t}\
 	\t\t} else {
 	\t\t\t// We've got an error response. Give this to the request;
 	\t\t\t// any subsequent requests will get the ReadResponseBody
 	\t\t\t// error if there is one.
-\t\t\t\tc.Error = ServerError(response.Error)
+\t\t\t\tcall.Error = ServerError(response.Error)
 	\t\t\terr = client.codec.ReadResponseBody(nil)
 	\t\t\tif err != nil {
 	\t\t\t\terr = errors.New(\"reading error body: \" + err.Error())
 	\t\t\t}\
 	\t\t}
-\t\t\tc.done()
+\t\t\tcall.done()
 	\t}
 	\t// Terminate pending calls.
+\tclient.sending.Lock()
 	client.mutex.Lock()
 	client.shutdown = true
+\tclosing := client.closing
 	for _, call := range client.pending {
 	\t\tcall.Error = err
 	\t\tcall.done()
 	}
 	client.mutex.Unlock()
-\tif err != io.EOF || !client.closing {
+\tclient.sending.Unlock()
+\tif err != io.EOF || !closing {
 	\t\tlog.Println(\"rpc: client protocol error:\", err)
 	\t}
  }
  • c から call への変数名変更: 可読性向上のため、c という変数名が call に変更されました。
  • シャットダウン処理の改善とデータ競合修正:
    • client.sending.Lock()input メソッドの終端、client.mutex.Lock() の直前に移動されました。これにより、シャットダウン時に client.sending ミューテックスが適切に保護され、send メソッドとのデータ競合が防止されます。
    • client.closing の値が client.mutex ロック内でローカル変数 closing にコピーされ、ロック解放後にこのローカル変数を使用するように変更されました。これにより、client.closing フィールドへのアクセスに関するデータ競合が解消されます。

Client.Go() および Client.Call() メソッドの変更

これらのメソッドは、クライアントのシャットダウン状態に関する冗長なチェックを削除しました。

--- a/src/pkg/net/rpc/client.go
+++ b/src/pkg/net/rpc/client.go
@@ -269,20 +276,12 @@ func (client *Client) Go(serviceMethod string, args interface{}, reply interface\
 	\t\t}\
 	\t}\
 	\tcall.Done = done
-\tif client.shutdown {
-\t\tcall.Error = ErrShutdown
-\t\tcall.done()
-\t\treturn call
-\t}\
 	\tclient.send(call)
 	\treturn call
  }
 \
  // Call invokes the named function, waits for it to complete, and returns its error status.
  func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
-\tif client.shutdown {
-\t\treturn ErrShutdown
-\t}\
 	\tcall := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
 	\treturn call.Error
  }
  • Client.Go() および Client.Call() メソッドから、client.shutdown の状態をチェックして ErrShutdown を返すロジックが削除されました。このチェックは client.send() メソッド内で既に適切に処理されるため、冗長であり、削除することでコードの重複が解消され、一貫性が向上します。

これらの変更により、net/rpc クライアントの並行処理の堅牢性が大幅に向上し、データ競合やメモリリークといった深刻な問題が解決されました。

コアとなるコードの変更箇所

src/pkg/net/rpc/client.go

--- a/src/pkg/net/rpc/client.go
+++ b/src/pkg/net/rpc/client.go
@@ -31,8 +31,7 @@ type Call struct {
 	Args          interface{} // The argument to the function (*struct).
 	Reply         interface{} // The reply from the function (*struct).
 	Error         error       // After completion, the error status.
-\tDone          chan *Call  // Strobes when call is complete; value is the error status.
-\tseq           uint64
+\tDone          chan *Call  // Strobes when call is complete.
 }
 
 // Client represents an RPC Client.
@@ -65,28 +64,33 @@ type ClientCodec interface {
 	Close() error
 }
 
-func (client *Client) send(c *Call) {
+func (client *Client) send(call *Call) {
+\tclient.sending.Lock()
+\tdefer client.sending.Unlock()
+\
 	// Register this call.
 	client.mutex.Lock()
 	if client.shutdown {
-\t\tc.Error = ErrShutdown
+\t\tcall.Error = ErrShutdown
 \t\tclient.mutex.Unlock()
-\t\tc.done()
+\t\tcall.done()
 \t\treturn
 	}
-\tc.seq = client.seq
+\tseq := client.seq
 	client.seq++
-\tclient.pending[c.seq] = c
+\tclient.pending[seq] = call
 	client.mutex.Unlock()
 
 	// Encode and send the request.
-\tclient.sending.Lock()
-\tdefer client.sending.Unlock()
-\tclient.request.Seq = c.seq
-\tclient.request.ServiceMethod = c.ServiceMethod
-\tif err := client.codec.WriteRequest(&client.request, c.Args); err != nil {
-\t\tc.Error = err
-\t\tc.done()
+\tclient.request.Seq = seq
+\tclient.request.ServiceMethod = call.ServiceMethod
+\terr := client.codec.WriteRequest(&client.request, call.Args)
+\tif err != nil {
+\t\tclient.mutex.Lock()
+\t\tdelete(client.pending, seq)
+\t\tclient.mutex.Unlock()
+\t\tcall.Error = err
+\t\tcall.done()
 \t}
 }
 
@@ -104,36 +108,39 @@ func (client *Client) input() {
 \t\t}\
 \t\tseq := response.Seq
 \t\tclient.mutex.Lock()
-\t\tc := client.pending[seq]
+\t\tcall := client.pending[seq]
 \t\tdelete(client.pending, seq)
 \t\tclient.mutex.Unlock()
 
 \t\tif response.Error == "" {
-\t\t\terr = client.codec.ReadResponseBody(c.Reply)
+\t\t\terr = client.codec.ReadResponseBody(call.Reply)
 \t\t\tif err != nil {
-\t\t\t\tc.Error = errors.New(\"reading body \" + err.Error())
+\t\t\t\tcall.Error = errors.New(\"reading body \" + err.Error())
 \t\t\t}\
 \t\t} else {
 \t\t\t// We've got an error response. Give this to the request;
 \t\t\t// any subsequent requests will get the ReadResponseBody
 \t\t\t// error if there is one.
-\t\t\tc.Error = ServerError(response.Error)
+\t\t\tcall.Error = ServerError(response.Error)
 \t\t\terr = client.codec.ReadResponseBody(nil)
 \t\t\tif err != nil {
 \t\t\t\terr = errors.New(\"reading error body: \" + err.Error())
 \t\t\t}\
 \t\t}
-\t\tc.done()
+\t\tcall.done()
 \t}
 \t// Terminate pending calls.
+\tclient.sending.Lock()
 \tclient.mutex.Lock()
 \tclient.shutdown = true
+\tclosing := client.closing
 \tfor _, call := range client.pending {
 \t\tcall.Error = err
 \t\tcall.done()
 \t}
 \tclient.mutex.Unlock()
-\tif err != io.EOF || !client.closing {
+\tclient.sending.Unlock()
+\tif err != io.EOF || !closing {
 \t\tlog.Println(\"rpc: client protocol error:\", err)
 \t}
 }
@@ -269,20 +276,12 @@ func (client *Client) Go(serviceMethod string, args interface{}, reply interface\
 \t\t}\
 \t}\
 \tcall.Done = done
-\tif client.shutdown {
-\t\tcall.Error = ErrShutdown
-\t\tcall.done()
-\t\treturn call
-\t}\
 \tclient.send(call)
 \treturn call
 }
 
 // Call invokes the named function, waits for it to complete, and returns its error status.
 func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
-\tif client.shutdown {
-\t\treturn ErrShutdown
-\t}\
 \tcall := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
 \treturn call.Error
 }

コアとなるコードの解説

Call 構造体 (src/pkg/net/rpc/client.go の変更)

  • Done チャネルのコメント修正: Done chan *Call // Strobes when call is complete.
    • 以前のコメント // Strobes when call is complete; value is the error status. は、Done チャネルがエラー状態を直接伝えるかのような誤解を招く可能性がありました。実際には、Done チャネルは *Call オブジェクト自体を送信し、エラー状態は Call オブジェクトの Error フィールドで確認されます。この修正により、Done チャネルの役割がより正確に記述されました。
  • seq フィールドの削除: seq uint64 が削除されました。
    • seq (シーケンス番号) は、RPC呼び出しを一意に識別するためにクライアント内部で管理されるべき情報であり、個々の Call オブジェクトが持つ必要はありませんでした。この変更により、Call 構造体の責務が簡素化され、Client 構造体内で seq の管理が一元化されました。

Client.send() メソッド (src/pkg/net/rpc/client.go の変更)

  • client.sending.Lock() の移動と defer の追加:
    +\tclient.sending.Lock()
    +\tdefer client.sending.Unlock()
    
    • 以前は client.request の設定と WriteRequest の直前でロックを取得していましたが、この変更により send メソッドの冒頭で client.sending ミューテックスが取得され、関数終了時に defer を使って解放されるようになりました。これにより、client.request の準備から実際のネットワーク書き込みまでの一連の処理が client.sending によって完全に保護され、複数のゴルーチンが同時にリクエストを送信しようとした際のデータ競合が確実に防止されます。
  • seq のローカル変数化と client.pending からの削除ロジックの追加:
    -\tc.seq = client.seq
    +\tseq := client.seq
     \tclient.seq++
    -\tclient.pending[c.seq] = c
    +\tclient.pending[seq] = call
    ...
    +\terr := client.codec.WriteRequest(&client.request, call.Args)
    +\tif err != nil {
    +\t\tclient.mutex.Lock()
    +\t\tdelete(client.pending, seq)
    +\t\tclient.mutex.Unlock()
    +\t\tcall.Error = err
    +\t\tcall.done()
    +\t}
    
    • Call 構造体から seq フィールドが削除されたため、client.seq の値をローカル変数 seq にコピーして使用するように変更されました。
    • 最も重要な変更は、client.codec.WriteRequest がエラーを返した場合の処理です。以前は Call オブジェクトが client.pending マップに残ったままになり、メモリリークの原因となっていました。この修正により、書き込みエラーが発生した場合に client.mutex を取得し、delete(client.pending, seq) を呼び出して、エラーになった Call オブジェクトを pending マップから明示的に削除するようになりました。これにより、メモリリークが防止されます。

Client.input() メソッド (src/pkg/net/rpc/client.go の変更)

  • 変数名 c から call への変更:
    -\t\t\tc := client.pending[seq]
    +\t\t\tcall := client.pending[seq]
    
    • 単なる変数名の変更ですが、コードの可読性が向上します。
  • シャットダウン処理における client.sending.Lock()client.closing のデータ競合修正:
    +\tclient.sending.Lock()
     \tclient.mutex.Lock()
     \tclient.shutdown = true
    +\tclosing := client.closing
     \tfor _, call := range client.pending {
     \t\tcall.Error = err
     \t\tcall.done()
     \t}
     \tclient.mutex.Unlock()
    -\tif err != io.EOF || !client.closing {
    +\tclient.sending.Unlock()
    +\tif err != io.EOF || !closing {
     \t\tlog.Println(\"rpc: client protocol error:\", err)
     \t}
    
    • client.sending.Lock()client.mutex.Lock() の直前に移動されました。これにより、クライアントのシャットダウン処理中に send メソッドと input メソッドの間で client.sending ミューテックスに関するデータ競合が発生する可能性が排除されます。
    • client.closing の値が client.mutex ロック内でローカル変数 closing にコピーされ、ロック解放後にこのローカル変数を使用するように変更されました。これにより、client.closing フィールドへのアクセスに関するデータ競合が解消され、シャットダウン処理の信頼性が向上します。

Client.Go() および Client.Call() メソッド (src/pkg/net/rpc/client.go の変更)

  • 冗長な client.shutdown チェックの削除:
    -\tif client.shutdown {
    -\t\tcall.Error = ErrShutdown
    -\t\tcall.done()
    -\t\treturn call
    -\t}
    
    • Client.Go() および Client.Call() メソッドから、クライアントがシャットダウン状態であるかどうかのチェックが削除されました。このチェックは client.send() メソッド内で既に適切に処理されるため、これらのメソッドでの重複したチェックは不要であり、コードの簡潔性と一貫性が向上しました。

これらの変更は、Goの並行処理モデルとミューテックスの適切な使用を通じて、net/rpc クライアントの堅牢性と信頼性を大幅に向上させています。

関連リンク

参考にした情報源リンク

  • Go言語公式ドキュメント: net/rpc パッケージ
  • Go言語公式ドキュメント: sync パッケージ
  • Go言語公式ドキュメント: channel
  • Go言語公式ドキュメント: defer
  • Go言語におけるデータ競合の概念に関する一般的な情報源 (例: Go Concurrency Patterns, The Go Programming Language)
  • Go Issue 2780 (ただし、Web検索ではこのコミットに直接関連する公開されたIssueは見つかりませんでした。これは内部的なIssueトラッカーのIDであるか、非常に古いIssueである可能性があります。)