Skip to content

Commit

Permalink
Refactor daemon.attach()
Browse files Browse the repository at this point in the history
Also makes streamConfig Pipe methods not return error, since there was
no error for them to be able to return anyway.

Signed-off-by: Brian Goff <[email protected]>
  • Loading branch information
cpuguy83 committed Jan 5, 2015
1 parent b2ab733 commit 21e44d7
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 158 deletions.
169 changes: 67 additions & 102 deletions daemon/attach.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

log "github.com/Sirupsen/logrus"
"github.com/docker/docker/engine"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/jsonlog"
"github.com/docker/docker/pkg/promise"
"github.com/docker/docker/utils"
Expand Down Expand Up @@ -114,131 +113,97 @@ func (daemon *Daemon) ContainerAttach(job *engine.Job) engine.Status {
func (daemon *Daemon) attach(streamConfig *StreamConfig, openStdin, stdinOnce, tty bool, stdin io.ReadCloser, stdout io.Writer, stderr io.Writer) chan error {
var (
cStdout, cStderr io.ReadCloser
cStdin io.WriteCloser
nJobs int
errors = make(chan error, 3)
)

// Connect stdin of container to the http conn.
if stdin != nil && openStdin {
cStdin = streamConfig.StdinPipe()
nJobs++
// Get the stdin pipe.
if cStdin, err := streamConfig.StdinPipe(); err != nil {
errors <- err
} else {
go func() {
log.Debugf("attach: stdin: begin")
defer log.Debugf("attach: stdin: end")
if stdinOnce && !tty {
defer cStdin.Close()
} else {
// No matter what, when stdin is closed (io.Copy unblock), close stdout and stderr
defer func() {
if cStdout != nil {
cStdout.Close()
}
if cStderr != nil {
cStderr.Close()
}
}()
}
if tty {
_, err = utils.CopyEscapable(cStdin, stdin)
} else {
_, err = io.Copy(cStdin, stdin)

}
if err == io.ErrClosedPipe {
err = nil
}
if err != nil {
log.Errorf("attach: stdin: %s", err)
}
errors <- err
}()
}
}

if stdout != nil {
cStdout = streamConfig.StdoutPipe()
nJobs++
// Get a reader end of a pipe that is attached as stdout to the container.
if p, err := streamConfig.StdoutPipe(); err != nil {
errors <- err
} else {
cStdout = p
go func() {
log.Debugf("attach: stdout: begin")
defer log.Debugf("attach: stdout: end")
// If we are in StdinOnce mode, then close stdin
if stdinOnce && stdin != nil {
defer stdin.Close()
}
_, err := io.Copy(stdout, cStdout)
if err == io.ErrClosedPipe {
err = nil
}
if err != nil {
log.Errorf("attach: stdout: %s", err)
}
errors <- err
}()
}
} else {
// Point stdout of container to a no-op writer.
go func() {
if cStdout, err := streamConfig.StdoutPipe(); err != nil {
log.Errorf("attach: stdout pipe: %s", err)
} else {
io.Copy(&ioutils.NopWriter{}, cStdout)
}
}()
}

if stderr != nil {
cStderr = streamConfig.StderrPipe()
nJobs++
if p, err := streamConfig.StderrPipe(); err != nil {
errors <- err
} else {
cStderr = p
go func() {
log.Debugf("attach: stderr: begin")
defer log.Debugf("attach: stderr: end")
// If we are in StdinOnce mode, then close stdin
// Why are we closing stdin here and above while handling stdout?
if stdinOnce && stdin != nil {
defer stdin.Close()
}
_, err := io.Copy(stderr, cStderr)
if err == io.ErrClosedPipe {
err = nil
}
if err != nil {
log.Errorf("attach: stderr: %s", err)
}

errors := make(chan error, nJobs)

// Connect stdin of container to the http conn.
if stdin != nil && openStdin {
// Get the stdin pipe.
cStdin = streamConfig.StdinPipe()
go func() {
log.Debugf("attach: stdin: begin")
defer func() {
if stdinOnce && !tty {
defer cStdin.Close()
} else {
// No matter what, when stdin is closed (io.Copy unblock), close stdout and stderr
if cStdout != nil {
cStdout.Close()
}
if cStderr != nil {
cStderr.Close()
}
}
errors <- err
log.Debugf("attach: stdin: end")
}()
}
} else {
// Point stderr at a no-op writer.
go func() {
if cStderr, err := streamConfig.StderrPipe(); err != nil {
log.Errorf("attach: stdout pipe: %s", err)
var err error
if tty {
_, err = utils.CopyEscapable(cStdin, stdin)
} else {
io.Copy(&ioutils.NopWriter{}, cStderr)
_, err = io.Copy(cStdin, stdin)

}
if err == io.ErrClosedPipe {
err = nil
}
if err != nil {
log.Errorf("attach: stdin: %s", err)
}
errors <- err
}()
}

return promise.Go(func() error {
attachStream := func(name string, stream io.Writer, streamPipe io.ReadCloser) {
if stream == nil {
return
}
defer func() {
if cStdout != nil {
cStdout.Close()
}
if cStderr != nil {
cStderr.Close()
// Make sure stdin gets closed
if stdinOnce && cStdin != nil {
stdin.Close()
cStdin.Close()
}
streamPipe.Close()
}()

log.Debugf("attach: %s: begin", name)
defer log.Debugf("attach: %s: end", name)
_, err := io.Copy(stream, streamPipe)
if err == io.ErrClosedPipe {
err = nil
}
if err != nil {
log.Errorf("attach: %s: %v", name, err)
}
errors <- err
}

go attachStream("stdout", stdout, cStdout)
go attachStream("stderr", stderr, cStderr)

return promise.Go(func() error {
for i := 0; i < nJobs; i++ {
log.Debugf("attach: waiting for job %d/%d", i+1, nJobs)
if err := <-errors; err != nil {
err := <-errors
if err != nil {
log.Errorf("attach: job %d returned error %s, aborting all jobs", i+1, err)
return err
}
Expand Down
17 changes: 7 additions & 10 deletions daemon/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,10 +370,7 @@ func (container *Container) Run() error {
}

func (container *Container) Output() (output []byte, err error) {
pipe, err := container.StdoutPipe()
if err != nil {
return nil, err
}
pipe := container.StdoutPipe()
defer pipe.Close()
if err := container.Start(); err != nil {
return nil, err
Expand All @@ -391,20 +388,20 @@ func (container *Container) Output() (output []byte, err error) {
// copied and delivered to all StdoutPipe and StderrPipe consumers, using
// a kind of "broadcaster".

func (streamConfig *StreamConfig) StdinPipe() (io.WriteCloser, error) {
return streamConfig.stdinPipe, nil
func (streamConfig *StreamConfig) StdinPipe() io.WriteCloser {
return streamConfig.stdinPipe
}

func (streamConfig *StreamConfig) StdoutPipe() (io.ReadCloser, error) {
func (streamConfig *StreamConfig) StdoutPipe() io.ReadCloser {
reader, writer := io.Pipe()
streamConfig.stdout.AddWriter(writer, "")
return ioutils.NewBufReader(reader), nil
return ioutils.NewBufReader(reader)
}

func (streamConfig *StreamConfig) StderrPipe() (io.ReadCloser, error) {
func (streamConfig *StreamConfig) StderrPipe() io.ReadCloser {
reader, writer := io.Pipe()
streamConfig.stderr.AddWriter(writer, "")
return ioutils.NewBufReader(reader), nil
return ioutils.NewBufReader(reader)
}

func (streamConfig *StreamConfig) StdoutLogPipe() io.ReadCloser {
Expand Down
2 changes: 1 addition & 1 deletion integration/commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ func TestAttachDisconnect(t *testing.T) {
}

// Try to avoid the timeout in destroy. Best effort, don't check error
cStdin, _ := container.StdinPipe()
cStdin := container.StdinPipe()
cStdin.Close()
container.WaitStop(-1 * time.Second)
}
Expand Down
40 changes: 8 additions & 32 deletions integration/container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,8 @@ func TestRestartStdin(t *testing.T) {
}
defer daemon.Destroy(container)

stdin, err := container.StdinPipe()
if err != nil {
t.Fatal(err)
}
stdout, err := container.StdoutPipe()
if err != nil {
t.Fatal(err)
}
stdin := container.StdinPipe()
stdout := container.StdoutPipe()
if err := container.Start(); err != nil {
t.Fatal(err)
}
Expand All @@ -56,14 +50,8 @@ func TestRestartStdin(t *testing.T) {
}

// Restart and try again
stdin, err = container.StdinPipe()
if err != nil {
t.Fatal(err)
}
stdout, err = container.StdoutPipe()
if err != nil {
t.Fatal(err)
}
stdin = container.StdinPipe()
stdout = container.StdoutPipe()
if err := container.Start(); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -103,14 +91,8 @@ func TestStdin(t *testing.T) {
}
defer daemon.Destroy(container)

stdin, err := container.StdinPipe()
if err != nil {
t.Fatal(err)
}
stdout, err := container.StdoutPipe()
if err != nil {
t.Fatal(err)
}
stdin := container.StdinPipe()
stdout := container.StdoutPipe()
if err := container.Start(); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -149,14 +131,8 @@ func TestTty(t *testing.T) {
}
defer daemon.Destroy(container)

stdin, err := container.StdinPipe()
if err != nil {
t.Fatal(err)
}
stdout, err := container.StdoutPipe()
if err != nil {
t.Fatal(err)
}
stdin := container.StdinPipe()
stdout := container.StdoutPipe()
if err := container.Start(); err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion integration/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,7 @@ func TestRestore(t *testing.T) {
}

// Simulate a crash/manual quit of dockerd: process dies, states stays 'Running'
cStdin, _ := container2.StdinPipe()
cStdin := container2.StdinPipe()
cStdin.Close()
if _, err := container2.WaitStop(2 * time.Second); err != nil {
t.Fatal(err)
Expand Down
15 changes: 3 additions & 12 deletions integration/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,8 @@ func containerFileExists(eng *engine.Engine, id, dir string, t Fataler) bool {

func containerAttach(eng *engine.Engine, id string, t Fataler) (io.WriteCloser, io.ReadCloser) {
c := getContainer(eng, id, t)
i, err := c.StdinPipe()
if err != nil {
t.Fatal(err)
}
o, err := c.StdoutPipe()
if err != nil {
t.Fatal(err)
}
i := c.StdinPipe()
o := c.StdoutPipe()
return i, o
}

Expand Down Expand Up @@ -292,10 +286,7 @@ func runContainer(eng *engine.Engine, r *daemon.Daemon, args []string, t *testin
return "", err
}
defer r.Destroy(container)
stdout, err := container.StdoutPipe()
if err != nil {
return "", err
}
stdout := container.StdoutPipe()
defer stdout.Close()

job := eng.Job("start", container.ID)
Expand Down

0 comments on commit 21e44d7

Please sign in to comment.