[インデックス 11435] ファイルの概要
このコミットは、Go言語の net/rpc
パッケージにおけるクライアントの実装に関する複数の問題を修正するものです。具体的には、Call.Error
におけるデータ競合の解消、Done
チャネルへの複数回送信の可能性の排除、一時的な Write
エラー発生時のメモリリークの修正、Client.shutdown
および Client.closing
におけるデータ競合の修正、そしてコメントの修正が含まれます。これらの修正は、RPCクライアントの堅牢性と信頼性を向上させることを目的としています。
コミット
commit 75397e65ee29e38ec89db58885fa6cf6e52ca558
Author: Dmitriy Vyukov <dvyukov@google.com>
Date: Fri Jan 27 11:27:05 2012 +0400
net/rpc: fix data race on Call.Error
+eliminates a possibility of sending a call to Done several times.
+fixes memory leak in case of temporal Write errors.
+fixes data race on Client.shutdown.
+fixes data race on Client.closing.
+fixes comments.
Fixes #2780.
R=r, rsc
CC=golang-dev, mpimenov
https://golang.org/cl/5571063
GitHub上でのコミットページへのリンク
https://github.com/golang/go/commit/75397e65ee29e38ec89db58885fa6cf6e52ca558
元コミット内容
net/rpc: fix data race on Call.Error
+eliminates a possibility of sending a call to Done several times.
+fixes memory leak in case of temporal Write errors.
+fixes data race on Client.shutdown.
+fixes data race on Client.closing.
+fixes comments.
Fixes #2780.
R=r, rsc
CC=golang-dev, mpimenov
https://golang.org/cl/5571063
変更の背景
このコミットは、Go言語の net/rpc
パッケージのクライアント実装における複数の深刻なバグに対処するために行われました。主な背景は以下の通りです。
Call.Error
のデータ競合:Call
構造体のError
フィールドは、RPC呼び出しの完了後にエラー状態を保持します。複数のゴルーチンが同時にこのフィールドにアクセスし、書き込みを行う可能性があるため、データ競合が発生していました。これは予測不能な動作やクラッシュを引き起こす可能性があります。Done
チャネルへの複数回送信:Call.Done
チャネルは、RPC呼び出しが完了したことを通知するために使用されます。しかし、特定の条件下で、同じCall
オブジェクトがDone
チャネルに複数回送信される可能性がありました。これは、チャネルのセマンティクスに反し、受信側で予期せぬ動作を引き起こす可能性があります。- 一時的な
Write
エラー時のメモリリーク: クライアントがリクエストを送信する際に一時的な書き込みエラー(例: ネットワークの一時的な問題)が発生した場合、そのCall
オブジェクトがClient.pending
マップから適切に削除されず、メモリリークが発生する可能性がありました。これにより、時間とともにメモリ使用量が増加し、システムのパフォーマンスに影響を与える可能性があります。 Client.shutdown
およびClient.closing
のデータ競合: クライアントのシャットダウン処理中にClient.shutdown
およびClient.closing
フラグにアクセスする際に、複数のゴルーチン間でデータ競合が発生していました。これは、シャットダウン処理の信頼性を損ない、未定義の動作につながる可能性があります。- コメントの修正: コードの可読性と理解を向上させるために、既存のコメントが修正されました。
これらの問題は、RPCクライアントの安定性と信頼性に直接影響を与えるため、早急な修正が必要でした。特にデータ競合はデバッグが困難であり、本番環境での予期せぬ障害につながる可能性があります。Fixes #2780
は、これらの問題がGoのIssueトラッカーで報告されていたことを示唆しています。
前提知識の解説
このコミットの変更内容を理解するためには、以下のGo言語の概念と net/rpc
パッケージの基本的な知識が必要です。
Go言語の並行処理
- ゴルーチン (Goroutines): Go言語における軽量なスレッドのようなものです。非常に少ないメモリで多数のゴルーチンを同時に実行できます。
- チャネル (Channels): ゴルーチン間で値を送受信するための通信メカニズムです。チャネルは、ゴルーチン間の同期と通信を安全に行うための主要な手段です。
sync.Mutex
: 相互排他ロックを提供し、共有リソースへのアクセスを同期するために使用されます。これにより、複数のゴルーチンが同時に同じデータに書き込むことによるデータ競合を防ぎます。Lock()
でロックを取得し、Unlock()
でロックを解放します。defer
キーワードと組み合わせることで、関数の終了時に確実にロックが解放されるようにできます。- データ競合 (Data Race): 複数のゴルーチンが同時に同じメモリ位置にアクセスし、少なくとも1つのアクセスが書き込みであり、かつそれらのアクセスが同期されていない場合に発生します。データ競合は、予測不能な結果やプログラムのクラッシュを引き起こす可能性があります。
net/rpc
パッケージ
net/rpc
パッケージは、Go言語でRPC (Remote Procedure Call) クライアントとサーバーを実装するための標準ライブラリです。これにより、異なるプロセスやネットワーク上のマシン間で関数呼び出しを行うことができます。
rpc.Client
: RPCサーバーへの接続を表すクライアントオブジェクトです。rpc.Call
: 単一のRPC呼び出しを表す構造体です。ServiceMethod
: 呼び出すサービスとメソッドの名前。Args
: メソッドに渡す引数。Reply
: メソッドからの戻り値を格納するポインタ。Error
: 呼び出しが完了した後のエラー状態。Done
: 呼び出しが完了したときに値が送信されるチャネル。
Client.Call()
: 同期的にRPC呼び出しを実行し、結果が返されるまでブロックします。Client.Go()
: 非同期的にRPC呼び出しを開始し、*Call
オブジェクトをすぐに返します。呼び出しの完了はCall.Done
チャネルで通知されます。ClientCodec
インターフェース: RPCメッセージのエンコードとデコードを行うためのインターフェースです。これにより、JSON-RPCやGob-RPCなど、さまざまなプロトコルをサポートできます。client.send()
メソッド: クライアントがRPCリクエストをエンコードし、サーバーに送信する内部メソッドです。client.input()
メソッド: クライアントがサーバーからのRPCレスポンスを読み取り、対応するCall
オブジェクトを処理する内部メソッドです。
その他のGo言語の概念
io.EOF
: ファイルの終端 (End Of File) を示すエラーです。ネットワーク接続が正常に閉じられた場合など、ストリームの終端に達したことを示します。log.Println()
: 標準エラー出力にログメッセージを出力するための関数です。
これらの概念を理解することで、コミットで行われた変更がなぜ必要であり、どのように機能するのかを深く把握することができます。
技術的詳細
このコミットは、src/pkg/net/rpc/client.go
ファイルに対して行われ、主に Client
構造体と Call
構造体、およびそれらに関連するメソッドの並行処理に関する問題を解決しています。
Call
構造体の変更
--- a/src/pkg/net/rpc/client.go
+++ b/src/pkg/net/rpc/client.go
@@ -31,8 +31,7 @@ type Call struct {
Args interface{} // The argument to the function (*struct).
Reply interface{} // The reply from the function (*struct).\
Error error // After completion, the error status.
- Done chan *Call // Strobes when call is complete; value is the error status.
- seq uint64
+ Done chan *Call // Strobes when call is complete.
}
Call.Done
のコメントから「; value is the error status.」が削除されました。これは、Done
チャネルが*Call
オブジェクト自体を送信するだけであり、エラー状態はCall.Error
フィールドで確認されるべきであることを明確にするためです。Call.seq
フィールドがCall
構造体から削除されました。これは、seq
がClient
内部で管理されるべき情報であり、Call
オブジェクト自体が持つ必要がないためです。これにより、Call
構造体の責務がより明確になります。
Client.send()
メソッドの変更
send
メソッドは、RPCリクエストをサーバーに送信する役割を担います。このメソッドには、データ競合とメモリリークの問題がありました。
--- a/src/pkg/net/rpc/client.go
+++ b/src/pkg/net/rpc/client.go
@@ -65,28 +64,33 @@ type ClientCodec interface {
Close() error
}\
\
-func (client *Client) send(c *Call) {
+func (client *Client) send(call *Call) {
+\tclient.sending.Lock()
+\tdefer client.sending.Unlock()
+\
// Register this call.
client.mutex.Lock()
if client.shutdown {
-\t\tc.Error = ErrShutdown
+\t\tcall.Error = ErrShutdown
\tclient.mutex.Unlock()
-\t\tc.done()
+\t\tcall.done()
\treturn
}
-\tc.seq = client.seq
+\tseq := client.seq
client.seq++
-\tclient.pending[c.seq] = c
+\tclient.pending[seq] = call
client.mutex.Unlock()
\
// Encode and send the request.
-\tclient.sending.Lock()
-\tdefer client.sending.Unlock()
-\tclient.request.Seq = c.seq
-\tclient.request.ServiceMethod = c.ServiceMethod
-\tif err := client.codec.WriteRequest(&client.request, c.Args); err != nil {
-\t\tc.Error = err
-\t\tc.done()
+\tclient.request.Seq = seq
+\tclient.request.ServiceMethod = call.ServiceMethod
+\terr := client.codec.WriteRequest(&client.request, call.Args)
+\tif err != nil {
+\t\tclient.mutex.Lock()
+\t\tdelete(client.pending, seq)
+\t\tclient.mutex.Unlock()
+\t\tcall.Error = err
+\t\tcall.done()
\t}
}
client.sending.Lock()
の移動: 以前はclient.request
の書き込み直前にロックを取得していましたが、変更後はsend
メソッドの冒頭でclient.sending.Lock()
を取得し、defer
で解放するように変更されました。これにより、client.request
の準備から実際の書き込みまでの一連の処理がsending
ミューテックスによって保護され、複数のゴルーチンが同時にリクエストを送信しようとした際のデータ競合が防止されます。Call.seq
のローカル変数化:c.seq
を直接使用する代わりに、seq := client.seq
としてローカル変数seq
にコピーし、このseq
をclient.pending
マップのキーとして使用するように変更されました。これにより、Call
構造体からseq
フィールドを削除することが可能になりました。- 書き込みエラー時のメモリリーク修正:
client.codec.WriteRequest
がエラーを返した場合、以前はc.Error = err
とc.done()
を呼び出すだけでした。しかし、この場合client.pending
マップからCall
オブジェクトが削除されないため、メモリリークが発生していました。修正後は、エラー発生時にclient.mutex.Lock()
を取得し、delete(client.pending, seq)
を呼び出してCall
オブジェクトをマップから明示的に削除するように変更されました。これにより、メモリリークが防止されます。
Client.input()
メソッドの変更
input
メソッドは、サーバーからのレスポンスを処理し、対応する Call
オブジェクトを完了させる役割を担います。
--- a/src/pkg/net/rpc/client.go
+++ b/src/pkg/net/rpc/client.go
@@ -104,36 +108,39 @@ func (client *Client) input() {
\t\t}\
\t\tseq := response.Seq
\t\tclient.mutex.Lock()
-\t\t\tc := client.pending[seq]
+\t\t\tcall := client.pending[seq]
\t\tdelete(client.pending, seq)
\t\tclient.mutex.Unlock()
\
\t\tif response.Error == "" {
-\t\t\t\terr = client.codec.ReadResponseBody(c.Reply)
+\t\t\t\terr = client.codec.ReadResponseBody(call.Reply)
\t\t\tif err != nil {
-\t\t\t\t\tc.Error = errors.New(\"reading body \" + err.Error())
+\t\t\t\t\tcall.Error = errors.New(\"reading body \" + err.Error())
\t\t\t}\
\t\t} else {
\t\t\t// We've got an error response. Give this to the request;
\t\t\t// any subsequent requests will get the ReadResponseBody
\t\t\t// error if there is one.
-\t\t\t\tc.Error = ServerError(response.Error)
+\t\t\t\tcall.Error = ServerError(response.Error)
\t\t\terr = client.codec.ReadResponseBody(nil)
\t\t\tif err != nil {
\t\t\t\terr = errors.New(\"reading error body: \" + err.Error())
\t\t\t}\
\t\t}
-\t\t\tc.done()
+\t\t\tcall.done()
\t}
\t// Terminate pending calls.
+\tclient.sending.Lock()
client.mutex.Lock()
client.shutdown = true
+\tclosing := client.closing
for _, call := range client.pending {
\t\tcall.Error = err
\t\tcall.done()
}
client.mutex.Unlock()
-\tif err != io.EOF || !client.closing {
+\tclient.sending.Unlock()
+\tif err != io.EOF || !closing {
\t\tlog.Println(\"rpc: client protocol error:\", err)
\t}
}
c
からcall
への変数名変更: 可読性向上のため、c
という変数名がcall
に変更されました。- シャットダウン処理の改善とデータ競合修正:
client.sending.Lock()
がinput
メソッドの終端、client.mutex.Lock()
の直前に移動されました。これにより、シャットダウン時にclient.sending
ミューテックスが適切に保護され、send
メソッドとのデータ競合が防止されます。client.closing
の値がclient.mutex
ロック内でローカル変数closing
にコピーされ、ロック解放後にこのローカル変数を使用するように変更されました。これにより、client.closing
フィールドへのアクセスに関するデータ競合が解消されます。
Client.Go()
および Client.Call()
メソッドの変更
これらのメソッドは、クライアントのシャットダウン状態に関する冗長なチェックを削除しました。
--- a/src/pkg/net/rpc/client.go
+++ b/src/pkg/net/rpc/client.go
@@ -269,20 +276,12 @@ func (client *Client) Go(serviceMethod string, args interface{}, reply interface\
\t\t}\
\t}\
\tcall.Done = done
-\tif client.shutdown {
-\t\tcall.Error = ErrShutdown
-\t\tcall.done()
-\t\treturn call
-\t}\
\tclient.send(call)
\treturn call
}
\
// Call invokes the named function, waits for it to complete, and returns its error status.
func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
-\tif client.shutdown {
-\t\treturn ErrShutdown
-\t}\
\tcall := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
\treturn call.Error
}
Client.Go()
およびClient.Call()
メソッドから、client.shutdown
の状態をチェックしてErrShutdown
を返すロジックが削除されました。このチェックはclient.send()
メソッド内で既に適切に処理されるため、冗長であり、削除することでコードの重複が解消され、一貫性が向上します。
これらの変更により、net/rpc
クライアントの並行処理の堅牢性が大幅に向上し、データ競合やメモリリークといった深刻な問題が解決されました。
コアとなるコードの変更箇所
src/pkg/net/rpc/client.go
--- a/src/pkg/net/rpc/client.go
+++ b/src/pkg/net/rpc/client.go
@@ -31,8 +31,7 @@ type Call struct {
Args interface{} // The argument to the function (*struct).
Reply interface{} // The reply from the function (*struct).
Error error // After completion, the error status.
-\tDone chan *Call // Strobes when call is complete; value is the error status.
-\tseq uint64
+\tDone chan *Call // Strobes when call is complete.
}
// Client represents an RPC Client.
@@ -65,28 +64,33 @@ type ClientCodec interface {
Close() error
}
-func (client *Client) send(c *Call) {
+func (client *Client) send(call *Call) {
+\tclient.sending.Lock()
+\tdefer client.sending.Unlock()
+\
// Register this call.
client.mutex.Lock()
if client.shutdown {
-\t\tc.Error = ErrShutdown
+\t\tcall.Error = ErrShutdown
\t\tclient.mutex.Unlock()
-\t\tc.done()
+\t\tcall.done()
\t\treturn
}
-\tc.seq = client.seq
+\tseq := client.seq
client.seq++
-\tclient.pending[c.seq] = c
+\tclient.pending[seq] = call
client.mutex.Unlock()
// Encode and send the request.
-\tclient.sending.Lock()
-\tdefer client.sending.Unlock()
-\tclient.request.Seq = c.seq
-\tclient.request.ServiceMethod = c.ServiceMethod
-\tif err := client.codec.WriteRequest(&client.request, c.Args); err != nil {
-\t\tc.Error = err
-\t\tc.done()
+\tclient.request.Seq = seq
+\tclient.request.ServiceMethod = call.ServiceMethod
+\terr := client.codec.WriteRequest(&client.request, call.Args)
+\tif err != nil {
+\t\tclient.mutex.Lock()
+\t\tdelete(client.pending, seq)
+\t\tclient.mutex.Unlock()
+\t\tcall.Error = err
+\t\tcall.done()
\t}
}
@@ -104,36 +108,39 @@ func (client *Client) input() {
\t\t}\
\t\tseq := response.Seq
\t\tclient.mutex.Lock()
-\t\tc := client.pending[seq]
+\t\tcall := client.pending[seq]
\t\tdelete(client.pending, seq)
\t\tclient.mutex.Unlock()
\t\tif response.Error == "" {
-\t\t\terr = client.codec.ReadResponseBody(c.Reply)
+\t\t\terr = client.codec.ReadResponseBody(call.Reply)
\t\t\tif err != nil {
-\t\t\t\tc.Error = errors.New(\"reading body \" + err.Error())
+\t\t\t\tcall.Error = errors.New(\"reading body \" + err.Error())
\t\t\t}\
\t\t} else {
\t\t\t// We've got an error response. Give this to the request;
\t\t\t// any subsequent requests will get the ReadResponseBody
\t\t\t// error if there is one.
-\t\t\tc.Error = ServerError(response.Error)
+\t\t\tcall.Error = ServerError(response.Error)
\t\t\terr = client.codec.ReadResponseBody(nil)
\t\t\tif err != nil {
\t\t\t\terr = errors.New(\"reading error body: \" + err.Error())
\t\t\t}\
\t\t}
-\t\tc.done()
+\t\tcall.done()
\t}
\t// Terminate pending calls.
+\tclient.sending.Lock()
\tclient.mutex.Lock()
\tclient.shutdown = true
+\tclosing := client.closing
\tfor _, call := range client.pending {
\t\tcall.Error = err
\t\tcall.done()
\t}
\tclient.mutex.Unlock()
-\tif err != io.EOF || !client.closing {
+\tclient.sending.Unlock()
+\tif err != io.EOF || !closing {
\t\tlog.Println(\"rpc: client protocol error:\", err)
\t}
}
@@ -269,20 +276,12 @@ func (client *Client) Go(serviceMethod string, args interface{}, reply interface\
\t\t}\
\t}\
\tcall.Done = done
-\tif client.shutdown {
-\t\tcall.Error = ErrShutdown
-\t\tcall.done()
-\t\treturn call
-\t}\
\tclient.send(call)
\treturn call
}
// Call invokes the named function, waits for it to complete, and returns its error status.
func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
-\tif client.shutdown {
-\t\treturn ErrShutdown
-\t}\
\tcall := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
\treturn call.Error
}
コアとなるコードの解説
Call
構造体 (src/pkg/net/rpc/client.go
の変更)
Done
チャネルのコメント修正:Done chan *Call // Strobes when call is complete.
- 以前のコメント
// Strobes when call is complete; value is the error status.
は、Done
チャネルがエラー状態を直接伝えるかのような誤解を招く可能性がありました。実際には、Done
チャネルは*Call
オブジェクト自体を送信し、エラー状態はCall
オブジェクトのError
フィールドで確認されます。この修正により、Done
チャネルの役割がより正確に記述されました。
- 以前のコメント
seq
フィールドの削除:seq uint64
が削除されました。seq
(シーケンス番号) は、RPC呼び出しを一意に識別するためにクライアント内部で管理されるべき情報であり、個々のCall
オブジェクトが持つ必要はありませんでした。この変更により、Call
構造体の責務が簡素化され、Client
構造体内でseq
の管理が一元化されました。
Client.send()
メソッド (src/pkg/net/rpc/client.go
の変更)
client.sending.Lock()
の移動とdefer
の追加:+\tclient.sending.Lock() +\tdefer client.sending.Unlock()
- 以前は
client.request
の設定とWriteRequest
の直前でロックを取得していましたが、この変更によりsend
メソッドの冒頭でclient.sending
ミューテックスが取得され、関数終了時にdefer
を使って解放されるようになりました。これにより、client.request
の準備から実際のネットワーク書き込みまでの一連の処理がclient.sending
によって完全に保護され、複数のゴルーチンが同時にリクエストを送信しようとした際のデータ競合が確実に防止されます。
- 以前は
seq
のローカル変数化とclient.pending
からの削除ロジックの追加:-\tc.seq = client.seq +\tseq := client.seq \tclient.seq++ -\tclient.pending[c.seq] = c +\tclient.pending[seq] = call ... +\terr := client.codec.WriteRequest(&client.request, call.Args) +\tif err != nil { +\t\tclient.mutex.Lock() +\t\tdelete(client.pending, seq) +\t\tclient.mutex.Unlock() +\t\tcall.Error = err +\t\tcall.done() +\t}
Call
構造体からseq
フィールドが削除されたため、client.seq
の値をローカル変数seq
にコピーして使用するように変更されました。- 最も重要な変更は、
client.codec.WriteRequest
がエラーを返した場合の処理です。以前はCall
オブジェクトがclient.pending
マップに残ったままになり、メモリリークの原因となっていました。この修正により、書き込みエラーが発生した場合にclient.mutex
を取得し、delete(client.pending, seq)
を呼び出して、エラーになったCall
オブジェクトをpending
マップから明示的に削除するようになりました。これにより、メモリリークが防止されます。
Client.input()
メソッド (src/pkg/net/rpc/client.go
の変更)
- 変数名
c
からcall
への変更:-\t\t\tc := client.pending[seq] +\t\t\tcall := client.pending[seq]
- 単なる変数名の変更ですが、コードの可読性が向上します。
- シャットダウン処理における
client.sending.Lock()
とclient.closing
のデータ競合修正:+\tclient.sending.Lock() \tclient.mutex.Lock() \tclient.shutdown = true +\tclosing := client.closing \tfor _, call := range client.pending { \t\tcall.Error = err \t\tcall.done() \t} \tclient.mutex.Unlock() -\tif err != io.EOF || !client.closing { +\tclient.sending.Unlock() +\tif err != io.EOF || !closing { \t\tlog.Println(\"rpc: client protocol error:\", err) \t}
client.sending.Lock()
がclient.mutex.Lock()
の直前に移動されました。これにより、クライアントのシャットダウン処理中にsend
メソッドとinput
メソッドの間でclient.sending
ミューテックスに関するデータ競合が発生する可能性が排除されます。client.closing
の値がclient.mutex
ロック内でローカル変数closing
にコピーされ、ロック解放後にこのローカル変数を使用するように変更されました。これにより、client.closing
フィールドへのアクセスに関するデータ競合が解消され、シャットダウン処理の信頼性が向上します。
Client.Go()
および Client.Call()
メソッド (src/pkg/net/rpc/client.go
の変更)
- 冗長な
client.shutdown
チェックの削除:-\tif client.shutdown { -\t\tcall.Error = ErrShutdown -\t\tcall.done() -\t\treturn call -\t}
Client.Go()
およびClient.Call()
メソッドから、クライアントがシャットダウン状態であるかどうかのチェックが削除されました。このチェックはclient.send()
メソッド内で既に適切に処理されるため、これらのメソッドでの重複したチェックは不要であり、コードの簡潔性と一貫性が向上しました。
これらの変更は、Goの並行処理モデルとミューテックスの適切な使用を通じて、net/rpc
クライアントの堅牢性と信頼性を大幅に向上させています。
関連リンク
- GitHubコミットページ: https://github.com/golang/go/commit/75397e65ee29e38ec89db58885fa6cf6e52ca558
- Gerrit Change-Id: https://golang.org/cl/5571063
参考にした情報源リンク
- Go言語公式ドキュメント:
net/rpc
パッケージ - Go言語公式ドキュメント:
sync
パッケージ - Go言語公式ドキュメント:
channel
- Go言語公式ドキュメント:
defer
- Go言語におけるデータ競合の概念に関する一般的な情報源 (例: Go Concurrency Patterns, The Go Programming Language)
- Go Issue 2780 (ただし、Web検索ではこのコミットに直接関連する公開されたIssueは見つかりませんでした。これは内部的なIssueトラッカーのIDであるか、非常に古いIssueである可能性があります。)