[インデックス 11435] ファイルの概要
このコミットは、Go言語の net/rpc パッケージにおけるクライアントの実装に関する複数の問題を修正するものです。具体的には、Call.Error におけるデータ競合の解消、Done チャネルへの複数回送信の可能性の排除、一時的な Write エラー発生時のメモリリークの修正、Client.shutdown および Client.closing におけるデータ競合の修正、そしてコメントの修正が含まれます。これらの修正は、RPCクライアントの堅牢性と信頼性を向上させることを目的としています。
コミット
commit 75397e65ee29e38ec89db58885fa6cf6e52ca558
Author: Dmitriy Vyukov <dvyukov@google.com>
Date: Fri Jan 27 11:27:05 2012 +0400
net/rpc: fix data race on Call.Error
+eliminates a possibility of sending a call to Done several times.
+fixes memory leak in case of temporal Write errors.
+fixes data race on Client.shutdown.
+fixes data race on Client.closing.
+fixes comments.
Fixes #2780.
R=r, rsc
CC=golang-dev, mpimenov
https://golang.org/cl/5571063
GitHub上でのコミットページへのリンク
https://github.com/golang/go/commit/75397e65ee29e38ec89db58885fa6cf6e52ca558
元コミット内容
net/rpc: fix data race on Call.Error
+eliminates a possibility of sending a call to Done several times.
+fixes memory leak in case of temporal Write errors.
+fixes data race on Client.shutdown.
+fixes data race on Client.closing.
+fixes comments.
Fixes #2780.
R=r, rsc
CC=golang-dev, mpimenov
https://golang.org/cl/5571063
変更の背景
このコミットは、Go言語の net/rpc パッケージのクライアント実装における複数の深刻なバグに対処するために行われました。主な背景は以下の通りです。
Call.Errorのデータ競合:Call構造体のErrorフィールドは、RPC呼び出しの完了後にエラー状態を保持します。複数のゴルーチンが同時にこのフィールドにアクセスし、書き込みを行う可能性があるため、データ競合が発生していました。これは予測不能な動作やクラッシュを引き起こす可能性があります。Doneチャネルへの複数回送信:Call.Doneチャネルは、RPC呼び出しが完了したことを通知するために使用されます。しかし、特定の条件下で、同じCallオブジェクトがDoneチャネルに複数回送信される可能性がありました。これは、チャネルのセマンティクスに反し、受信側で予期せぬ動作を引き起こす可能性があります。- 一時的な
Writeエラー時のメモリリーク: クライアントがリクエストを送信する際に一時的な書き込みエラー(例: ネットワークの一時的な問題)が発生した場合、そのCallオブジェクトがClient.pendingマップから適切に削除されず、メモリリークが発生する可能性がありました。これにより、時間とともにメモリ使用量が増加し、システムのパフォーマンスに影響を与える可能性があります。 Client.shutdownおよびClient.closingのデータ競合: クライアントのシャットダウン処理中にClient.shutdownおよびClient.closingフラグにアクセスする際に、複数のゴルーチン間でデータ競合が発生していました。これは、シャットダウン処理の信頼性を損ない、未定義の動作につながる可能性があります。- コメントの修正: コードの可読性と理解を向上させるために、既存のコメントが修正されました。
これらの問題は、RPCクライアントの安定性と信頼性に直接影響を与えるため、早急な修正が必要でした。特にデータ競合はデバッグが困難であり、本番環境での予期せぬ障害につながる可能性があります。Fixes #2780 は、これらの問題がGoのIssueトラッカーで報告されていたことを示唆しています。
前提知識の解説
このコミットの変更内容を理解するためには、以下のGo言語の概念と net/rpc パッケージの基本的な知識が必要です。
Go言語の並行処理
- ゴルーチン (Goroutines): Go言語における軽量なスレッドのようなものです。非常に少ないメモリで多数のゴルーチンを同時に実行できます。
- チャネル (Channels): ゴルーチン間で値を送受信するための通信メカニズムです。チャネルは、ゴルーチン間の同期と通信を安全に行うための主要な手段です。
sync.Mutex: 相互排他ロックを提供し、共有リソースへのアクセスを同期するために使用されます。これにより、複数のゴルーチンが同時に同じデータに書き込むことによるデータ競合を防ぎます。Lock()でロックを取得し、Unlock()でロックを解放します。deferキーワードと組み合わせることで、関数の終了時に確実にロックが解放されるようにできます。- データ競合 (Data Race): 複数のゴルーチンが同時に同じメモリ位置にアクセスし、少なくとも1つのアクセスが書き込みであり、かつそれらのアクセスが同期されていない場合に発生します。データ競合は、予測不能な結果やプログラムのクラッシュを引き起こす可能性があります。
net/rpc パッケージ
net/rpc パッケージは、Go言語でRPC (Remote Procedure Call) クライアントとサーバーを実装するための標準ライブラリです。これにより、異なるプロセスやネットワーク上のマシン間で関数呼び出しを行うことができます。
rpc.Client: RPCサーバーへの接続を表すクライアントオブジェクトです。rpc.Call: 単一のRPC呼び出しを表す構造体です。ServiceMethod: 呼び出すサービスとメソッドの名前。Args: メソッドに渡す引数。Reply: メソッドからの戻り値を格納するポインタ。Error: 呼び出しが完了した後のエラー状態。Done: 呼び出しが完了したときに値が送信されるチャネル。
Client.Call(): 同期的にRPC呼び出しを実行し、結果が返されるまでブロックします。Client.Go(): 非同期的にRPC呼び出しを開始し、*Callオブジェクトをすぐに返します。呼び出しの完了はCall.Doneチャネルで通知されます。ClientCodecインターフェース: RPCメッセージのエンコードとデコードを行うためのインターフェースです。これにより、JSON-RPCやGob-RPCなど、さまざまなプロトコルをサポートできます。client.send()メソッド: クライアントがRPCリクエストをエンコードし、サーバーに送信する内部メソッドです。client.input()メソッド: クライアントがサーバーからのRPCレスポンスを読み取り、対応するCallオブジェクトを処理する内部メソッドです。
その他のGo言語の概念
io.EOF: ファイルの終端 (End Of File) を示すエラーです。ネットワーク接続が正常に閉じられた場合など、ストリームの終端に達したことを示します。log.Println(): 標準エラー出力にログメッセージを出力するための関数です。
これらの概念を理解することで、コミットで行われた変更がなぜ必要であり、どのように機能するのかを深く把握することができます。
技術的詳細
このコミットは、src/pkg/net/rpc/client.go ファイルに対して行われ、主に Client 構造体と Call 構造体、およびそれらに関連するメソッドの並行処理に関する問題を解決しています。
Call 構造体の変更
--- a/src/pkg/net/rpc/client.go
+++ b/src/pkg/net/rpc/client.go
@@ -31,8 +31,7 @@ type Call struct {
Args interface{} // The argument to the function (*struct).
Reply interface{} // The reply from the function (*struct).\
Error error // After completion, the error status.
- Done chan *Call // Strobes when call is complete; value is the error status.
- seq uint64
+ Done chan *Call // Strobes when call is complete.
}
Call.Doneのコメントから「; value is the error status.」が削除されました。これは、Doneチャネルが*Callオブジェクト自体を送信するだけであり、エラー状態はCall.Errorフィールドで確認されるべきであることを明確にするためです。Call.seqフィールドがCall構造体から削除されました。これは、seqがClient内部で管理されるべき情報であり、Callオブジェクト自体が持つ必要がないためです。これにより、Call構造体の責務がより明確になります。
Client.send() メソッドの変更
send メソッドは、RPCリクエストをサーバーに送信する役割を担います。このメソッドには、データ競合とメモリリークの問題がありました。
--- a/src/pkg/net/rpc/client.go
+++ b/src/pkg/net/rpc/client.go
@@ -65,28 +64,33 @@ type ClientCodec interface {
Close() error
}\
\
-func (client *Client) send(c *Call) {
+func (client *Client) send(call *Call) {
+\tclient.sending.Lock()
+\tdefer client.sending.Unlock()
+\
// Register this call.
client.mutex.Lock()
if client.shutdown {
-\t\tc.Error = ErrShutdown
+\t\tcall.Error = ErrShutdown
\tclient.mutex.Unlock()
-\t\tc.done()
+\t\tcall.done()
\treturn
}
-\tc.seq = client.seq
+\tseq := client.seq
client.seq++
-\tclient.pending[c.seq] = c
+\tclient.pending[seq] = call
client.mutex.Unlock()
\
// Encode and send the request.
-\tclient.sending.Lock()
-\tdefer client.sending.Unlock()
-\tclient.request.Seq = c.seq
-\tclient.request.ServiceMethod = c.ServiceMethod
-\tif err := client.codec.WriteRequest(&client.request, c.Args); err != nil {
-\t\tc.Error = err
-\t\tc.done()
+\tclient.request.Seq = seq
+\tclient.request.ServiceMethod = call.ServiceMethod
+\terr := client.codec.WriteRequest(&client.request, call.Args)
+\tif err != nil {
+\t\tclient.mutex.Lock()
+\t\tdelete(client.pending, seq)
+\t\tclient.mutex.Unlock()
+\t\tcall.Error = err
+\t\tcall.done()
\t}
}
client.sending.Lock()の移動: 以前はclient.requestの書き込み直前にロックを取得していましたが、変更後はsendメソッドの冒頭でclient.sending.Lock()を取得し、deferで解放するように変更されました。これにより、client.requestの準備から実際の書き込みまでの一連の処理がsendingミューテックスによって保護され、複数のゴルーチンが同時にリクエストを送信しようとした際のデータ競合が防止されます。Call.seqのローカル変数化:c.seqを直接使用する代わりに、seq := client.seqとしてローカル変数seqにコピーし、このseqをclient.pendingマップのキーとして使用するように変更されました。これにより、Call構造体からseqフィールドを削除することが可能になりました。- 書き込みエラー時のメモリリーク修正:
client.codec.WriteRequestがエラーを返した場合、以前はc.Error = errとc.done()を呼び出すだけでした。しかし、この場合client.pendingマップからCallオブジェクトが削除されないため、メモリリークが発生していました。修正後は、エラー発生時にclient.mutex.Lock()を取得し、delete(client.pending, seq)を呼び出してCallオブジェクトをマップから明示的に削除するように変更されました。これにより、メモリリークが防止されます。
Client.input() メソッドの変更
input メソッドは、サーバーからのレスポンスを処理し、対応する Call オブジェクトを完了させる役割を担います。
--- a/src/pkg/net/rpc/client.go
+++ b/src/pkg/net/rpc/client.go
@@ -104,36 +108,39 @@ func (client *Client) input() {
\t\t}\
\t\tseq := response.Seq
\t\tclient.mutex.Lock()
-\t\t\tc := client.pending[seq]
+\t\t\tcall := client.pending[seq]
\t\tdelete(client.pending, seq)
\t\tclient.mutex.Unlock()
\
\t\tif response.Error == "" {
-\t\t\t\terr = client.codec.ReadResponseBody(c.Reply)
+\t\t\t\terr = client.codec.ReadResponseBody(call.Reply)
\t\t\tif err != nil {
-\t\t\t\t\tc.Error = errors.New(\"reading body \" + err.Error())
+\t\t\t\t\tcall.Error = errors.New(\"reading body \" + err.Error())
\t\t\t}\
\t\t} else {
\t\t\t// We've got an error response. Give this to the request;
\t\t\t// any subsequent requests will get the ReadResponseBody
\t\t\t// error if there is one.
-\t\t\t\tc.Error = ServerError(response.Error)
+\t\t\t\tcall.Error = ServerError(response.Error)
\t\t\terr = client.codec.ReadResponseBody(nil)
\t\t\tif err != nil {
\t\t\t\terr = errors.New(\"reading error body: \" + err.Error())
\t\t\t}\
\t\t}
-\t\t\tc.done()
+\t\t\tcall.done()
\t}
\t// Terminate pending calls.
+\tclient.sending.Lock()
client.mutex.Lock()
client.shutdown = true
+\tclosing := client.closing
for _, call := range client.pending {
\t\tcall.Error = err
\t\tcall.done()
}
client.mutex.Unlock()
-\tif err != io.EOF || !client.closing {
+\tclient.sending.Unlock()
+\tif err != io.EOF || !closing {
\t\tlog.Println(\"rpc: client protocol error:\", err)
\t}
}
cからcallへの変数名変更: 可読性向上のため、cという変数名がcallに変更されました。- シャットダウン処理の改善とデータ競合修正:
client.sending.Lock()がinputメソッドの終端、client.mutex.Lock()の直前に移動されました。これにより、シャットダウン時にclient.sendingミューテックスが適切に保護され、sendメソッドとのデータ競合が防止されます。client.closingの値がclient.mutexロック内でローカル変数closingにコピーされ、ロック解放後にこのローカル変数を使用するように変更されました。これにより、client.closingフィールドへのアクセスに関するデータ競合が解消されます。
Client.Go() および Client.Call() メソッドの変更
これらのメソッドは、クライアントのシャットダウン状態に関する冗長なチェックを削除しました。
--- a/src/pkg/net/rpc/client.go
+++ b/src/pkg/net/rpc/client.go
@@ -269,20 +276,12 @@ func (client *Client) Go(serviceMethod string, args interface{}, reply interface\
\t\t}\
\t}\
\tcall.Done = done
-\tif client.shutdown {
-\t\tcall.Error = ErrShutdown
-\t\tcall.done()
-\t\treturn call
-\t}\
\tclient.send(call)
\treturn call
}
\
// Call invokes the named function, waits for it to complete, and returns its error status.
func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
-\tif client.shutdown {
-\t\treturn ErrShutdown
-\t}\
\tcall := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
\treturn call.Error
}
Client.Go()およびClient.Call()メソッドから、client.shutdownの状態をチェックしてErrShutdownを返すロジックが削除されました。このチェックはclient.send()メソッド内で既に適切に処理されるため、冗長であり、削除することでコードの重複が解消され、一貫性が向上します。
これらの変更により、net/rpc クライアントの並行処理の堅牢性が大幅に向上し、データ競合やメモリリークといった深刻な問題が解決されました。
コアとなるコードの変更箇所
src/pkg/net/rpc/client.go
--- a/src/pkg/net/rpc/client.go
+++ b/src/pkg/net/rpc/client.go
@@ -31,8 +31,7 @@ type Call struct {
Args interface{} // The argument to the function (*struct).
Reply interface{} // The reply from the function (*struct).
Error error // After completion, the error status.
-\tDone chan *Call // Strobes when call is complete; value is the error status.
-\tseq uint64
+\tDone chan *Call // Strobes when call is complete.
}
// Client represents an RPC Client.
@@ -65,28 +64,33 @@ type ClientCodec interface {
Close() error
}
-func (client *Client) send(c *Call) {
+func (client *Client) send(call *Call) {
+\tclient.sending.Lock()
+\tdefer client.sending.Unlock()
+\
// Register this call.
client.mutex.Lock()
if client.shutdown {
-\t\tc.Error = ErrShutdown
+\t\tcall.Error = ErrShutdown
\t\tclient.mutex.Unlock()
-\t\tc.done()
+\t\tcall.done()
\t\treturn
}
-\tc.seq = client.seq
+\tseq := client.seq
client.seq++
-\tclient.pending[c.seq] = c
+\tclient.pending[seq] = call
client.mutex.Unlock()
// Encode and send the request.
-\tclient.sending.Lock()
-\tdefer client.sending.Unlock()
-\tclient.request.Seq = c.seq
-\tclient.request.ServiceMethod = c.ServiceMethod
-\tif err := client.codec.WriteRequest(&client.request, c.Args); err != nil {
-\t\tc.Error = err
-\t\tc.done()
+\tclient.request.Seq = seq
+\tclient.request.ServiceMethod = call.ServiceMethod
+\terr := client.codec.WriteRequest(&client.request, call.Args)
+\tif err != nil {
+\t\tclient.mutex.Lock()
+\t\tdelete(client.pending, seq)
+\t\tclient.mutex.Unlock()
+\t\tcall.Error = err
+\t\tcall.done()
\t}
}
@@ -104,36 +108,39 @@ func (client *Client) input() {
\t\t}\
\t\tseq := response.Seq
\t\tclient.mutex.Lock()
-\t\tc := client.pending[seq]
+\t\tcall := client.pending[seq]
\t\tdelete(client.pending, seq)
\t\tclient.mutex.Unlock()
\t\tif response.Error == "" {
-\t\t\terr = client.codec.ReadResponseBody(c.Reply)
+\t\t\terr = client.codec.ReadResponseBody(call.Reply)
\t\t\tif err != nil {
-\t\t\t\tc.Error = errors.New(\"reading body \" + err.Error())
+\t\t\t\tcall.Error = errors.New(\"reading body \" + err.Error())
\t\t\t}\
\t\t} else {
\t\t\t// We've got an error response. Give this to the request;
\t\t\t// any subsequent requests will get the ReadResponseBody
\t\t\t// error if there is one.
-\t\t\tc.Error = ServerError(response.Error)
+\t\t\tcall.Error = ServerError(response.Error)
\t\t\terr = client.codec.ReadResponseBody(nil)
\t\t\tif err != nil {
\t\t\t\terr = errors.New(\"reading error body: \" + err.Error())
\t\t\t}\
\t\t}
-\t\tc.done()
+\t\tcall.done()
\t}
\t// Terminate pending calls.
+\tclient.sending.Lock()
\tclient.mutex.Lock()
\tclient.shutdown = true
+\tclosing := client.closing
\tfor _, call := range client.pending {
\t\tcall.Error = err
\t\tcall.done()
\t}
\tclient.mutex.Unlock()
-\tif err != io.EOF || !client.closing {
+\tclient.sending.Unlock()
+\tif err != io.EOF || !closing {
\t\tlog.Println(\"rpc: client protocol error:\", err)
\t}
}
@@ -269,20 +276,12 @@ func (client *Client) Go(serviceMethod string, args interface{}, reply interface\
\t\t}\
\t}\
\tcall.Done = done
-\tif client.shutdown {
-\t\tcall.Error = ErrShutdown
-\t\tcall.done()
-\t\treturn call
-\t}\
\tclient.send(call)
\treturn call
}
// Call invokes the named function, waits for it to complete, and returns its error status.
func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
-\tif client.shutdown {
-\t\treturn ErrShutdown
-\t}\
\tcall := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
\treturn call.Error
}
コアとなるコードの解説
Call 構造体 (src/pkg/net/rpc/client.go の変更)
Doneチャネルのコメント修正:Done chan *Call // Strobes when call is complete.- 以前のコメント
// Strobes when call is complete; value is the error status.は、Doneチャネルがエラー状態を直接伝えるかのような誤解を招く可能性がありました。実際には、Doneチャネルは*Callオブジェクト自体を送信し、エラー状態はCallオブジェクトのErrorフィールドで確認されます。この修正により、Doneチャネルの役割がより正確に記述されました。
- 以前のコメント
seqフィールドの削除:seq uint64が削除されました。seq(シーケンス番号) は、RPC呼び出しを一意に識別するためにクライアント内部で管理されるべき情報であり、個々のCallオブジェクトが持つ必要はありませんでした。この変更により、Call構造体の責務が簡素化され、Client構造体内でseqの管理が一元化されました。
Client.send() メソッド (src/pkg/net/rpc/client.go の変更)
client.sending.Lock()の移動とdeferの追加:+\tclient.sending.Lock() +\tdefer client.sending.Unlock()- 以前は
client.requestの設定とWriteRequestの直前でロックを取得していましたが、この変更によりsendメソッドの冒頭でclient.sendingミューテックスが取得され、関数終了時にdeferを使って解放されるようになりました。これにより、client.requestの準備から実際のネットワーク書き込みまでの一連の処理がclient.sendingによって完全に保護され、複数のゴルーチンが同時にリクエストを送信しようとした際のデータ競合が確実に防止されます。
- 以前は
seqのローカル変数化とclient.pendingからの削除ロジックの追加:-\tc.seq = client.seq +\tseq := client.seq \tclient.seq++ -\tclient.pending[c.seq] = c +\tclient.pending[seq] = call ... +\terr := client.codec.WriteRequest(&client.request, call.Args) +\tif err != nil { +\t\tclient.mutex.Lock() +\t\tdelete(client.pending, seq) +\t\tclient.mutex.Unlock() +\t\tcall.Error = err +\t\tcall.done() +\t}Call構造体からseqフィールドが削除されたため、client.seqの値をローカル変数seqにコピーして使用するように変更されました。- 最も重要な変更は、
client.codec.WriteRequestがエラーを返した場合の処理です。以前はCallオブジェクトがclient.pendingマップに残ったままになり、メモリリークの原因となっていました。この修正により、書き込みエラーが発生した場合にclient.mutexを取得し、delete(client.pending, seq)を呼び出して、エラーになったCallオブジェクトをpendingマップから明示的に削除するようになりました。これにより、メモリリークが防止されます。
Client.input() メソッド (src/pkg/net/rpc/client.go の変更)
- 変数名
cからcallへの変更:-\t\t\tc := client.pending[seq] +\t\t\tcall := client.pending[seq]- 単なる変数名の変更ですが、コードの可読性が向上します。
- シャットダウン処理における
client.sending.Lock()とclient.closingのデータ競合修正:+\tclient.sending.Lock() \tclient.mutex.Lock() \tclient.shutdown = true +\tclosing := client.closing \tfor _, call := range client.pending { \t\tcall.Error = err \t\tcall.done() \t} \tclient.mutex.Unlock() -\tif err != io.EOF || !client.closing { +\tclient.sending.Unlock() +\tif err != io.EOF || !closing { \t\tlog.Println(\"rpc: client protocol error:\", err) \t}client.sending.Lock()がclient.mutex.Lock()の直前に移動されました。これにより、クライアントのシャットダウン処理中にsendメソッドとinputメソッドの間でclient.sendingミューテックスに関するデータ競合が発生する可能性が排除されます。client.closingの値がclient.mutexロック内でローカル変数closingにコピーされ、ロック解放後にこのローカル変数を使用するように変更されました。これにより、client.closingフィールドへのアクセスに関するデータ競合が解消され、シャットダウン処理の信頼性が向上します。
Client.Go() および Client.Call() メソッド (src/pkg/net/rpc/client.go の変更)
- 冗長な
client.shutdownチェックの削除:-\tif client.shutdown { -\t\tcall.Error = ErrShutdown -\t\tcall.done() -\t\treturn call -\t}Client.Go()およびClient.Call()メソッドから、クライアントがシャットダウン状態であるかどうかのチェックが削除されました。このチェックはclient.send()メソッド内で既に適切に処理されるため、これらのメソッドでの重複したチェックは不要であり、コードの簡潔性と一貫性が向上しました。
これらの変更は、Goの並行処理モデルとミューテックスの適切な使用を通じて、net/rpc クライアントの堅牢性と信頼性を大幅に向上させています。
関連リンク
- GitHubコミットページ: https://github.com/golang/go/commit/75397e65ee29e38ec89db58885fa6cf6e52ca558
- Gerrit Change-Id: https://golang.org/cl/5571063
参考にした情報源リンク
- Go言語公式ドキュメント:
net/rpcパッケージ - Go言語公式ドキュメント:
syncパッケージ - Go言語公式ドキュメント:
channel - Go言語公式ドキュメント:
defer - Go言語におけるデータ競合の概念に関する一般的な情報源 (例: Go Concurrency Patterns, The Go Programming Language)
- Go Issue 2780 (ただし、Web検索ではこのコミットに直接関連する公開されたIssueは見つかりませんでした。これは内部的なIssueトラッカーのIDであるか、非常に古いIssueである可能性があります。)