[インデックス 10564] ファイルの概要
このコミットは、Go言語の実験的なSSHパッケージ (exp/ssh
) において、SSHセッションの標準入出力 (stdin, stdout, stderr) をGoの io.Pipe
インターフェースを通じて利用可能にする StdinPipe
, StdoutPipe
, StderrPipe
メソッドを追加するものです。これにより、リモートコマンドとの間でより柔軟なストリームベースのデータ転送が可能になります。
コミット
commit c4d0ac0e2f7a12cf44f4711b47bbc5737c14ce9c
Author: Dave Cheney <dave@cheney.net>
Date: Thu Dec 1 08:30:16 2011 -0200
exp/ssh: add Std{in,out,err}Pipe methods to Session
R=gustav.paul, cw, agl, rsc, n13m3y3r
CC=golang-dev
https://golang.org/cl/5433080
GitHub上でのコミットページへのリンク
https://github.com/golang/go/commit/c4d0ac0e2f7a12cf44f4711b47bbc5737c14ce9c
元コミット内容
exp/ssh: add Std{in,out,err}Pipe methods to Session
このコミットは、Go言語の実験的なSSHパッケージ (exp/ssh
) の Session
型に、リモートコマンドの標準入出力ストリームを io.Pipe
として取得するための StdinPipe
, StdoutPipe
, StderrPipe
メソッドを追加します。これにより、ユーザーはSSHセッションを通じて実行されるリモートプロセスとの間で、より細かく制御されたストリームベースの通信を行うことができるようになります。
変更の背景
この変更以前は、exp/ssh
パッケージの Session
型は、Stdin
, Stdout
, Stderr
フィールドに io.Reader
や io.Writer
を直接割り当てることで標準入出力を扱っていました。しかし、これは柔軟性に欠けるアプローチでした。特に、リモートコマンドの実行中に動的に入出力ストリームを操作したり、非同期的にデータを読み書きしたりするような高度なシナリオには不向きでした。
io.Pipe
を介して標準入出力ストリームを提供することで、Goの標準ライブラリが提供するパイプの機能(例えば、io.Copy
を使ったストリーム間のデータ転送や、非同期処理)を最大限に活用できるようになります。これにより、SSHセッションを介したリモートコマンドの実行が、ローカルプロセスとの対話と類似した、よりGoらしいイディオムで記述できるようになることが期待されます。
前提知識の解説
SSHセッションと標準入出力
SSH (Secure Shell) は、ネットワークを介して安全にコンピュータを操作するためのプロトコルです。SSHクライアントがSSHサーバーに接続すると、通常は「セッション」が確立されます。このセッションを通じて、リモートコマンドの実行やシェルへのアクセスが可能になります。
リモートで実行されるコマンドやシェルは、ローカルのプログラムと同様に、以下の3つの標準ストリームを持っています。
- 標準入力 (stdin): プログラムがデータを読み込むための入力ストリーム。通常はキーボード入力や、別のプログラムからのパイプで提供されます。
- 標準出力 (stdout): プログラムが通常の出力(結果など)を書き込むための出力ストリーム。通常は画面に表示されるか、ファイルや別のプログラムへのパイプにリダイレクトされます。
- 標準エラー出力 (stderr): プログラムがエラーメッセージや診断情報を書き込むための出力ストリーム。通常は標準出力とは別に扱われ、エラーログなどに利用されます。
Goの io.Pipe
Go言語の io
パッケージには、Pipe()
関数があります。これは、io.Reader
と io.Writer
のペアを返します。このペアは、一方に書き込まれたデータがもう一方から読み出されるという、メモリ上のパイプを形成します。
io.PipeReader
:io.Reader
インターフェースを実装し、パイプの読み込み側を提供します。io.PipeWriter
:io.Writer
インターフェースを実装し、パイプの書き込み側を提供します。
io.Pipe
は、異なるGoroutine間でストリームデータを安全かつ効率的に受け渡す際に非常に有用です。例えば、あるGoroutineがデータを生成して io.PipeWriter
に書き込み、別のGoroutineが io.PipeReader
からそのデータを読み込むといった使い方ができます。
io.Closer
インターフェース
io.Closer
インターフェースは、Close() error
メソッドを持つインターフェースです。ファイルやネットワーク接続など、リソースを解放する必要があるオブジェクトがこのインターフェースを実装します。io.PipeReader
と io.PipeWriter
も io.Closer
を実装しており、パイプの読み書きが完了した際には Close()
を呼び出してリソースを適切に解放する必要があります。
技術的詳細
このコミットの主要な変更点は、exp/ssh
パッケージの Session
型に StdinPipe()
, StdoutPipe()
, StderrPipe()
メソッドを追加したことです。これらのメソッドは、Goの io.Pipe
を利用して、リモートコマンドの標準入出力ストリームへのアクセスを提供します。
Session
構造体の変更
Session
構造体に closeAfterWait []io.Closer
という新しいフィールドが追加されました。これは、StdinPipe
, StdoutPipe
, StderrPipe
によって作成されたパイプの読み書き側(io.PipeReader
または io.PipeWriter
)を保持するためのスライスです。セッションが終了し、Wait()
メソッドが呼び出された際に、これらの io.Closer
インターフェースを持つオブジェクトが適切に閉じられるように管理されます。これにより、リソースリークを防ぎます。
StdinPipe()
メソッド
StdinPipe()
は、リモートコマンドの標準入力に接続される io.WriteCloser
を返します。
io.Pipe()
を呼び出して、io.PipeReader
(pr) とio.PipeWriter
(pw) のペアを作成します。- セッションの
Stdin
フィールドにpr
(パイプの読み込み側) を設定します。これにより、リモートコマンドはpr
からデータを読み込むことになります。 pr
をs.closeAfterWait
スライスに追加します。これは、セッション終了時にpr
が閉じられるようにするためです。pw
(パイプの書き込み側) を呼び出し元に返します。ユーザーはこのpw
にデータを書き込むことで、リモートコマンドの標準入力にデータを送ることができます。
StdoutPipe()
および StderrPipe()
メソッド
StdoutPipe()
と StderrPipe()
は、それぞれリモートコマンドの標準出力と標準エラー出力に接続される io.ReadCloser
を返します。
io.Pipe()
を呼び出して、io.PipeReader
(pr) とio.PipeWriter
(pw) のペアを作成します。- セッションの
Stdout
またはStderr
フィールドにpw
(パイプの書き込み側) を設定します。これにより、リモートコマンドの出力はpw
に書き込まれることになります。 pw
をs.closeAfterWait
スライスに追加します。これは、セッション終了時にpw
が閉じられるようにするためです。pr
(パイプの読み込み側) を呼び出し元に返します。ユーザーはこのpr
からデータを読み込むことで、リモートコマンドの標準出力または標準エラー出力を取得できます。
StdoutPipe()
と StderrPipe()
のコメントには、重要な注意点があります。
「There is a fixed amount of buffering that is shared between stdout and stderr streams. If the StdoutPipe reader is not serviced fast enought it may eventually cause the remote command to block.」
これは、標準出力と標準エラー出力のストリーム間で共有される固定量のバッファが存在することを示しています。もし StdoutPipe
や StderrPipe
からの読み込みが十分に速く行われない場合、バッファが満杯になり、結果としてリモートコマンドがブロック(停止)する可能性があることを意味します。これは、SSHプロトコルにおけるチャンネルのフロー制御メカニズムに起因するもので、ユーザーはパイプからの読み込みを非同期的に、かつ迅速に行う必要があることを示唆しています。
Wait()
メソッドの変更
Wait()
メソッドは、リモートコマンドの終了を待機するだけでなく、s.closeAfterWait
スライスに登録されたすべての io.Closer
オブジェクトを閉じる処理が追加されました。これにより、パイプに関連するリソースがセッション終了時に確実に解放されます。
stdin()
, stdout()
, stderr()
ヘルパー関数の変更
これらの内部ヘルパー関数も、io.Copy
の挙動をより正確に制御するために微調整されました。特に stdin()
では、chanWriter
の Close()
メソッドが呼び出されるように変更され、書き込みが完了したことを適切に通知できるようになりました。
コアとなるコードの変更箇所
src/pkg/exp/ssh/session.go
--- a/src/pkg/exp/ssh/session.go
+++ b/src/pkg/exp/ssh/session.go
@@ -54,9 +54,10 @@ type Session struct {
*clientChan // the channel backing this session
- started bool // true once a Shell or Run is invoked.
- copyFuncs []func() error
- errch chan error // one send per copyFunc
+ started bool // true once Start, Run or Shell is invoked.
+ closeAfterWait []io.Closer
+ copyFuncs []func() error
+ errch chan error // one send per copyFunc
}
// RFC 4254 Section 6.4.
@@ -231,7 +232,7 @@ func (s *Session) start() error {
return nil
}
-// Wait waits for the remote command to exit.
+// Wait waits for the remote command to exit.
func (s *Session) Wait() error {
if !s.started {
return errors.New("ssh: session not started")
@@ -244,11 +245,12 @@ func (s *Session) Wait() error {
copyError = err
}
}
-\
+ for _, fd := range s.closeAfterWait {
+ fd.Close()
+ }
if waitErr != nil {
return waitErr
}
-\
return copyError
}
@@ -283,11 +285,15 @@ func (s *Session) stdin() error {
s.Stdin = new(bytes.Buffer)
}
s.copyFuncs = append(s.copyFuncs, func() error {
-\t\t_, err := io.Copy(&chanWriter{\
+\t\tw := &chanWriter{\
packetWriter: s,
peersId: s.peersId,
win: s.win,
-\t\t}, s.Stdin)\
+\t\t}\
+\t\t_, err := io.Copy(w, s.Stdin)\
+\t\tif err1 := w.Close(); err == nil {
+\t\t\terr = err1
+\t\t}\
return err
})
return nil
@@ -298,11 +304,12 @@ func (s *Session) stdout() error {
s.Stdout = ioutil.Discard
}
s.copyFuncs = append(s.copyFuncs, func() error {
-\t\t_, err := io.Copy(s.Stdout, &chanReader{\
+\t\tr := &chanReader{\
packetWriter: s,
peersId: s.peersId,
data: s.data,
-\t\t})\
+\t\t}\
+\t\t_, err := io.Copy(s.Stdout, r)\
return err
})
return nil
@@ -313,16 +320,72 @@ func (s *Session) stderr() error {
s.Stderr = ioutil.Discard
}
s.copyFuncs = append(s.copyFuncs, func() error {
-\t\t_, err := io.Copy(s.Stderr, &chanReader{\
+\t\tr := &chanReader{\
packetWriter: s,
peersId: s.peersId,
data: s.dataExt,
-\t\t})\
+\t\t}\
+\t\t_, err := io.Copy(s.Stderr, r)\
return err
})
return nil
}
+// StdinPipe returns a pipe that will be connected to the
+// remote command's standard input when the command starts.
+func (s *Session) StdinPipe() (io.WriteCloser, error) {
+ if s.Stdin != nil {
+ return nil, errors.New("ssh: Stdin already set")
+ }
+ if s.started {
+ return nil, errors.New("ssh: StdinPipe after process started")
+ }
+ pr, pw := io.Pipe()
+ s.Stdin = pr
+ s.closeAfterWait = append(s.closeAfterWait, pr)
+ return pw, nil
+}
+
+// StdoutPipe returns a pipe that will be connected to the
+// remote command's standard output when the command starts.
+// There is a fixed amount of buffering that is shared between
+// stdout and stderr streams. If the StdoutPipe reader is
+// not serviced fast enought it may eventually cause the
+// remote command to block.
+func (s *Session) StdoutPipe() (io.ReadCloser, error) {
+ if s.Stdout != nil {
+ return nil, errors.New("ssh: Stdout already set")
+ }
+ if s.started {
+ return nil, errors.New("ssh: StdoutPipe after process started")
+ }
+ pr, pw := io.Pipe()
+ s.Stdout = pw
+ s.closeAfterWait = append(s.closeAfterWait, pw)
+ return pr, nil
+}
+
+// StderrPipe returns a pipe that will be connected to the
+// remote command's standard error when the command starts.
+// There is a fixed amount of buffering that is shared between
+// stdout and stderr streams. If the StderrPipe reader is
+// not serviced fast enought it may eventually cause the
+// remote command to block.
+func (s *Session) StderrPipe() (io.ReadCloser, error) {
+ if s.Stderr != nil {
+ return nil, errors.New("ssh: Stderr already set")
+ }
+ if s.started {
+ return nil, errors.New("ssh: StderrPipe after process started")
+ }
+ pr, pw := io.Pipe()
+ s.Stderr = pw
+ s.closeAfterWait = append(s.closeAfterWait, pw)
+ return pr, nil
+}
+
+// TODO(dfc) add Output and CombinedOutput helpers
+
// NewSession returns a new interactive session on the remote host.
func (c *ClientConn) NewSession() (*Session, error) {
ch := c.newChan(c.transport)
src/pkg/exp/ssh/session_test.go
このコミットで新規追加されたテストファイルです。
--- /dev/null
+++ b/src/pkg/exp/ssh/session_test.go
@@ -0,0 +1,149 @@
+// Copyright 2011 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package ssh
+
+// Session tests.
+
+import (
+ "bytes"
+ "io"
+ "testing"
+)
+
+// dial constructs a new test server and returns a *ClientConn.
+func dial(t *testing.T) *ClientConn {
+ pw := password("tiger")
+ serverConfig.PasswordCallback = func(user, pass string) bool {
+ return user == "testuser" && pass == string(pw)
+ }
+ serverConfig.PubKeyCallback = nil
+
+ l, err := Listen("tcp", "127.0.0.1:0", serverConfig)
+ if err != nil {
+ t.Fatalf("unable to listen: %s", err)
+ }
+ go func() {
+ defer l.Close()
+ conn, err := l.Accept()
+ if err != io.EOF {
+ t.Errorf("Unable to accept: %v", err)
+ return
+ }
+ defer conn.Close()
+ if err := conn.Handshake(); err != nil {
+ t.Errorf("Unable to handshake: %v", err)
+ return
+ }
+ for {
+ ch, err := conn.Accept()
+ if err == io.EOF {
+ return
+ }
+ if err != nil {
+ t.Errorf("Unable to accept incoming channel request: %v", err)
+ return
+ }
+ if ch.ChannelType() != "session" {
+ ch.Reject(UnknownChannelType, "unknown channel type")
+ continue
+ }
+ ch.Accept()
+ go func() {
+ defer ch.Close()
+ // this string is returned to stdout
+ shell := NewServerShell(ch, "golang")
+ shell.ReadLine()
+ type exitMsg struct {
+ PeersId uint32
+ Request string
+ WantReply bool
+ Status uint32
+ }
+ // TODO(dfc) casting to the concrete type should not be
+ // necessary to send a packet.
+ msg := exitMsg{
+ PeersId: ch.(*channel).theirId,
+ Request: "exit-status",
+ WantReply: false,
+ Status: 0,
+ }
+ ch.(*channel).serverConn.writePacket(marshal(msgChannelRequest, msg))
+ }()
+ }
+ t.Log("done")
+ }()
+
+ config := &ClientConfig{
+ User: "testuser",
+ Auth: []ClientAuth{
+ ClientAuthPassword(pw),
+ },
+ }
+
+ c, err := Dial("tcp", l.Addr().String(), config)
+ if err != nil {
+ t.Fatalf("unable to dial remote side: %s", err)
+ }
+ return c
+}
+
+// Test a simple string is returned to session.Stdout.
+func TestSessionShell(t *testing.T) {
+ conn := dial(t)
+ defer conn.Close()
+ session, err := conn.NewSession()
+ if err != nil {
+ t.Fatalf("Unable to request new session: %s", err)
+ }
+ defer session.Close()
+ stdout := new(bytes.Buffer)
+ session.Stdout = stdout
+ if err := session.Shell(); err != nil {
+ t.Fatalf("Unable to execute command: %s", err)
+ }
+ if err := session.Wait(); err != nil {
+ t.Fatalf("Remote command did not exit cleanly: %s", err)
+ }
+ actual := stdout.String()
+ if actual != "golang" {
+ t.Fatalf("Remote shell did not return expected string: expected=golang, actual=%s", actual)
+ }
+}
+
+// TODO(dfc) add support for Std{in,err}Pipe when the Server supports it.
+
+// Test a simple string is returned via StdoutPipe.
+func TestSessionStdoutPipe(t *testing.H) {
+ conn := dial(t)
+ defer conn.Close()
+ session, err := conn.NewSession()
+ if err != nil {
+ t.Fatalf("Unable to request new session: %s", err)
+ }
+ defer session.Close()
+ stdout, err := session.StdoutPipe()
+ if err != nil {
+ t.Fatalf("Unable to request StdoutPipe(): %v", err)
+ }
+ var buf bytes.Buffer
+ if err := session.Shell(); err != nil {
+ t.Fatalf("Unable to execute command: %s", err)
+ }
+ done := make(chan bool, 1)
+ go func() {
+ if _, err := io.Copy(&buf, stdout); err != nil {
+ t.Errorf("Copy of stdout failed: %v", err)
+ }
+ done <- true
+ }()
+ if err := session.Wait(); err != nil {
+ t.Fatalf("Remote command did not exit cleanly: %s", err)
+ }
+ <-done
+ actual := buf.String()
+ if actual != "golang" {
+ t.Fatalf("Remote shell did not return expected string: expected=golang, actual=%s", actual)
+ }
+}
コアとなるコードの解説
Session
構造体への closeAfterWait
フィールドの追加
type Session struct {
// ...
started bool // true once Start, Run or Shell is invoked.
closeAfterWait []io.Closer // New field to hold closers for pipes
copyFuncs []func() error
errch chan error // one send per copyFunc
}
closeAfterWait
は、StdinPipe
, StdoutPipe
, StderrPipe
メソッドによって作成された io.PipeReader
や io.PipeWriter
のインスタンスを保持するために追加されました。これらのインスタンスは io.Closer
インターフェースを実装しているため、セッションの終了時に Wait()
メソッド内でまとめて Close()
が呼び出され、リソースが適切に解放されるようになります。
Wait()
メソッドの変更
func (s *Session) Wait() error {
// ... existing wait logic ...
for _, fd := range s.closeAfterWait {
fd.Close() // Close all registered closers
}
// ... existing error handling ...
return copyError
}
Wait()
メソッドの最後に、s.closeAfterWait
スライス内のすべての io.Closer
オブジェクトに対して Close()
メソッドを呼び出すループが追加されました。これにより、パイプが確実に閉じられ、関連するリソースが解放されます。
StdinPipe()
, StdoutPipe()
, StderrPipe()
メソッドの追加
これらのメソッドは、それぞれリモートコマンドの標準入出力に接続されるパイプを返します。基本的なパターンは以下の通りです。
// StdinPipe returns a pipe that will be connected to the
// remote command's standard input when the command starts.
func (s *Session) StdinPipe() (io.WriteCloser, error) {
if s.Stdin != nil {
return nil, errors.New("ssh: Stdin already set")
}
if s.started {
return nil, errors.New("ssh: StdinPipe after process started")
}
pr, pw := io.Pipe() // Create a new pipe
s.Stdin = pr // Connect the read-end of the pipe to session's Stdin
s.closeAfterWait = append(s.closeAfterWait, pr) // Register for closing
return pw, nil // Return the write-end to the user
}
// StdoutPipe returns a pipe that will be connected to the
// remote command's standard output when the command starts.
// ... (comments about buffering) ...
func (s *Session) StdoutPipe() (io.ReadCloser, error) {
if s.Stdout != nil {
return nil, errors.New("ssh: Stdout already set")
}
if s.started {
return nil, errors.New("ssh: StdoutPipe after process started")
}
pr, pw := io.Pipe() // Create a new pipe
s.Stdout = pw // Connect the write-end of the pipe to session's Stdout
s.closeAfterWait = append(s.closeAfterWait, pw) // Register for closing
return pr, nil // Return the read-end to the user
}
// StderrPipe returns a pipe that will be connected to the
// remote command's standard error when the command starts.
// ... (comments about buffering) ...
func (s *Session) StderrPipe() (io.ReadCloser, error) {
if s.Stderr != nil {
return nil, errors.New("ssh: Stderr already set")
}
if s.started {
return nil, errors.New("ssh: StderrPipe after process started")
}
pr, pw := io.Pipe() // Create a new pipe
s.Stderr = pw // Connect the write-end of the pipe to session's Stderr
s.closeAfterWait = append(s.closeAfterWait, pw) // Register for closing
return pr, nil // Return the read-end to the user
}
これらのメソッドは、以下のチェックを行います。
- 既に
Stdin
/Stdout
/Stderr
が設定されていないか。 - セッションが既に開始されていないか (
s.started
がtrue
でないか)。 これらのチェックをパスした場合、io.Pipe()
を呼び出してパイプを作成し、その適切な側(StdinPipe
の場合は読み込み側、StdoutPipe
/StderrPipe
の場合は書き込み側)をSession
の対応するフィールドに割り当てます。そして、パイプのもう一方の側をユーザーに返します。また、パイプのio.Closer
インターフェースを実装する側をs.closeAfterWait
に追加し、セッション終了時のクリーンアップを保証します。
stdin()
, stdout()
, stderr()
ヘルパー関数の変更
これらの内部関数は、Session
の Stdin
, Stdout
, Stderr
フィールドに設定された io.Reader
や io.Writer
から、SSHチャンネルへのデータコピーを処理します。変更は主に、io.Copy
の呼び出し方と、chanWriter
の Close()
メソッドの呼び出しに関するものです。
func (s *Session) stdin() error {
// ...
s.copyFuncs = append(s.copyFuncs, func() error {
w := &chanWriter{ // Assign to a variable to call Close()
packetWriter: s,
peersId: s.peersId,
win: s.win,
}
_, err := io.Copy(w, s.Stdin)
if err1 := w.Close(); err == nil { // Ensure Close() is called and its error is propagated if no other error occurred
err = err1
}
return err
})
return nil
}
func (s *Session) stdout() error {
// ...
s.copyFuncs = append(s.copyFuncs, func() error {
r := &chanReader{ // Assign to a variable
packetWriter: s,
peersId: s.peersId,
data: s.data,
}
_, err := io.Copy(s.Stdout, r) // Use the variable
return err
})
return nil
}
func (s *Session) stderr() error {
// ...
s.copyFuncs = append(s.copyFuncs, func() error {
r := &chanReader{ // Assign to a variable
packetWriter: s,
peersId: s.peersId,
data: s.dataExt,
}
_, err := io.Copy(s.Stderr, r) // Use the variable
return err
})
return nil
}
これらの変更は、chanWriter
や chanReader
のインスタンスを明示的な変数に割り当てることで、io.Copy
の呼び出し後でも Close()
メソッドを確実に呼び出せるようにするためのものです。特に stdin()
では、chanWriter
の Close()
が呼び出されることで、リモート側へのデータ送信が完了したことを適切に通知し、チャンネルの終了処理を助けます。
テストファイル session_test.go
の追加
このコミットでは、新しい機能の動作を検証するために session_test.go
が追加されました。
dial(t *testing.T) *ClientConn
: テスト用のSSHサーバーとクライアント接続をセットアップするヘルパー関数です。これにより、テストケースが独立して実行できるようになります。TestSessionShell(t *testing.T)
:Session.Shell()
メソッドの基本的な動作をテストします。リモートシェルが期待される文字列を標準出力に書き込むことを確認します。TestSessionStdoutPipe(t *testing.T)
: 新しく追加されたSession.StdoutPipe()
メソッドの動作をテストします。StdoutPipe()
から取得したio.ReadCloser
を介してリモートシェルの出力を読み込み、それが期待される文字列と一致するかどうかを検証します。このテストでは、io.Copy
を別のGoroutineで実行し、非同期的な読み込みをシミュレートしています。
関連リンク
- Go Gerrit Change 5433080: https://golang.org/cl/5433080 (このコミットの元の変更リスト)
- Go
io
パッケージドキュメント: https://pkg.go.dev/io - Go
io.Pipe
ドキュメント: https://pkg.go.dev/io#Pipe - RFC 4254 (SSH Connection Protocol): https://www.rfc-editor.org/rfc/rfc4254 (特に Section 6.4 "Channel Close" および "Data Transfer")
参考にした情報源リンク
- Go言語の公式ドキュメント (
io
パッケージ、io.Pipe
など) - SSHプロトコルに関するRFC (特にRFC 4254)
- Go言語のSSHパッケージ (
golang.org/x/crypto/ssh
の前身であるexp/ssh
のコードベース) - 一般的なSSHクライアント/サーバーの動作原理に関する知識