Skip to content

Commit

Permalink
Add transport package to support CancelRequest
Browse files Browse the repository at this point in the history
Signed-off-by: Tibor Vass <[email protected]>
  • Loading branch information
Tibor Vass committed May 19, 2015
1 parent cf8c0d0 commit 73823e5
Show file tree
Hide file tree
Showing 13 changed files with 325 additions and 128 deletions.
12 changes: 7 additions & 5 deletions graph/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/docker/docker/pkg/progressreader"
"github.com/docker/docker/pkg/streamformatter"
"github.com/docker/docker/pkg/stringid"
"github.com/docker/docker/pkg/transport"
"github.com/docker/docker/registry"
"github.com/docker/docker/utils"
)
Expand Down Expand Up @@ -55,16 +56,17 @@ func (s *TagStore) Pull(image string, tag string, imagePullConfig *ImagePullConf
defer s.poolRemove("pull", utils.ImageReference(repoInfo.LocalName, tag))

logrus.Debugf("pulling image from host %q with remote name %q", repoInfo.Index.Name, repoInfo.RemoteName)
endpoint, err := repoInfo.GetEndpoint()

endpoint, err := repoInfo.GetEndpoint(imagePullConfig.MetaHeaders)
if err != nil {
return err
}

// TODO(tiborvass): reuse client from endpoint?
// Adds Docker-specific headers as well as user-specified headers (metaHeaders)
tr := &registry.DockerHeaders{
tr := transport.NewTransport(
registry.NewTransport(registry.ReceiveTimeout, endpoint.IsSecure),
imagePullConfig.MetaHeaders,
}
registry.DockerHeaders(imagePullConfig.MetaHeaders)...,
)
client := registry.HTTPClient(tr)
r, err := registry.NewSession(client, imagePullConfig.AuthConfig, endpoint)
if err != nil {
Expand Down
12 changes: 7 additions & 5 deletions graph/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/docker/docker/pkg/progressreader"
"github.com/docker/docker/pkg/streamformatter"
"github.com/docker/docker/pkg/stringid"
"github.com/docker/docker/pkg/transport"
"github.com/docker/docker/registry"
"github.com/docker/docker/runconfig"
"github.com/docker/docker/utils"
Expand Down Expand Up @@ -509,16 +510,17 @@ func (s *TagStore) Push(localName string, imagePushConfig *ImagePushConfig) erro
}
defer s.poolRemove("push", repoInfo.LocalName)

endpoint, err := repoInfo.GetEndpoint()
endpoint, err := repoInfo.GetEndpoint(imagePushConfig.MetaHeaders)
if err != nil {
return err
}

// TODO(tiborvass): reuse client from endpoint?
// Adds Docker-specific headers as well as user-specified headers (metaHeaders)
tr := &registry.DockerHeaders{
tr := transport.NewTransport(
registry.NewTransport(registry.NoTimeout, endpoint.IsSecure),
imagePushConfig.MetaHeaders,
}
registry.DockerHeaders(imagePushConfig.MetaHeaders)...,
)
client := registry.HTTPClient(tr)
r, err := registry.NewSession(client, imagePushConfig.AuthConfig, endpoint)
if err != nil {
return err
Expand Down
27 changes: 27 additions & 0 deletions pkg/transport/LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
Copyright (c) 2009 The oauth2 Authors. All rights reserved.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:

* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following disclaimer
in the documentation and/or other materials provided with the
distribution.
* Neither the name of Google Inc. nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
148 changes: 148 additions & 0 deletions pkg/transport/transport.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package transport

import (
"io"
"net/http"
"sync"
)

type RequestModifier interface {
ModifyRequest(*http.Request) error
}

type headerModifier http.Header

// NewHeaderRequestModifier returns a RequestModifier that merges the HTTP headers
// passed as an argument, with the HTTP headers of a request.
//
// If the same key is present in both, the modifying header values for that key,
// are appended to the values for that same key in the request header.
func NewHeaderRequestModifier(header http.Header) RequestModifier {
return headerModifier(header)
}

func (h headerModifier) ModifyRequest(req *http.Request) error {
for k, s := range http.Header(h) {
req.Header[k] = append(req.Header[k], s...)
}

return nil
}

// NewTransport returns an http.RoundTripper that modifies requests according to
// the RequestModifiers passed in the arguments, before sending the requests to
// the base http.RoundTripper (which, if nil, defaults to http.DefaultTransport).
func NewTransport(base http.RoundTripper, modifiers ...RequestModifier) http.RoundTripper {
return &transport{
Modifiers: modifiers,
Base: base,
}
}

// transport is an http.RoundTripper that makes HTTP requests after
// copying and modifying the request
type transport struct {
Modifiers []RequestModifier
Base http.RoundTripper

mu sync.Mutex // guards modReq
modReq map[*http.Request]*http.Request // original -> modified
}

func (t *transport) RoundTrip(req *http.Request) (*http.Response, error) {
req2 := CloneRequest(req)
for _, modifier := range t.Modifiers {
if err := modifier.ModifyRequest(req2); err != nil {
return nil, err
}
}

t.setModReq(req, req2)
res, err := t.base().RoundTrip(req2)
if err != nil {
t.setModReq(req, nil)
return nil, err
}
res.Body = &OnEOFReader{
Rc: res.Body,
Fn: func() { t.setModReq(req, nil) },
}
return res, nil
}

// CancelRequest cancels an in-flight request by closing its connection.
func (t *transport) CancelRequest(req *http.Request) {
type canceler interface {
CancelRequest(*http.Request)
}
if cr, ok := t.base().(canceler); ok {
t.mu.Lock()
modReq := t.modReq[req]
delete(t.modReq, req)
t.mu.Unlock()
cr.CancelRequest(modReq)
}
}

func (t *transport) base() http.RoundTripper {
if t.Base != nil {
return t.Base
}
return http.DefaultTransport
}

func (t *transport) setModReq(orig, mod *http.Request) {
t.mu.Lock()
defer t.mu.Unlock()
if t.modReq == nil {
t.modReq = make(map[*http.Request]*http.Request)
}
if mod == nil {
delete(t.modReq, orig)
} else {
t.modReq[orig] = mod
}
}

// CloneRequest returns a clone of the provided *http.Request.
// The clone is a shallow copy of the struct and its Header map.
func CloneRequest(r *http.Request) *http.Request {
// shallow copy of the struct
r2 := new(http.Request)
*r2 = *r
// deep copy of the Header
r2.Header = make(http.Header, len(r.Header))
for k, s := range r.Header {
r2.Header[k] = append([]string(nil), s...)
}

return r2
}

// OnEOFReader ensures a callback function is called
// on Close() and when the underlying Reader returns an io.EOF error
type OnEOFReader struct {
Rc io.ReadCloser
Fn func()
}

func (r *OnEOFReader) Read(p []byte) (n int, err error) {
n, err = r.Rc.Read(p)
if err == io.EOF {
r.runFunc()
}
return
}

func (r *OnEOFReader) Close() error {
err := r.Rc.Close()
r.runFunc()
return err
}

func (r *OnEOFReader) runFunc() {
if fn := r.Fn; fn != nil {
fn()
r.Fn = nil
}
}
26 changes: 11 additions & 15 deletions registry/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ func (auth *RequestAuthorization) getToken() (string, error) {
return auth.tokenCache, nil
}

client := auth.registryEndpoint.HTTPClient()

for _, challenge := range auth.registryEndpoint.AuthChallenges {
switch strings.ToLower(challenge.Scheme) {
case "basic":
Expand All @@ -57,7 +55,7 @@ func (auth *RequestAuthorization) getToken() (string, error) {
params[k] = v
}
params["scope"] = fmt.Sprintf("%s:%s:%s", auth.resource, auth.scope, strings.Join(auth.actions, ","))
token, err := getToken(auth.authConfig.Username, auth.authConfig.Password, params, auth.registryEndpoint, client)
token, err := getToken(auth.authConfig.Username, auth.authConfig.Password, params, auth.registryEndpoint)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -104,7 +102,6 @@ func loginV1(authConfig *cliconfig.AuthConfig, registryEndpoint *Endpoint) (stri
status string
reqBody []byte
err error
client = registryEndpoint.HTTPClient()
reqStatusCode = 0
serverAddress = authConfig.ServerAddress
)
Expand All @@ -128,7 +125,7 @@ func loginV1(authConfig *cliconfig.AuthConfig, registryEndpoint *Endpoint) (stri

// using `bytes.NewReader(jsonBody)` here causes the server to respond with a 411 status.
b := strings.NewReader(string(jsonBody))
req1, err := client.Post(serverAddress+"users/", "application/json; charset=utf-8", b)
req1, err := registryEndpoint.client.Post(serverAddress+"users/", "application/json; charset=utf-8", b)
if err != nil {
return "", fmt.Errorf("Server Error: %s", err)
}
Expand All @@ -151,7 +148,7 @@ func loginV1(authConfig *cliconfig.AuthConfig, registryEndpoint *Endpoint) (stri
if string(reqBody) == "\"Username or email already exists\"" {
req, err := http.NewRequest("GET", serverAddress+"users/", nil)
req.SetBasicAuth(authConfig.Username, authConfig.Password)
resp, err := client.Do(req)
resp, err := registryEndpoint.client.Do(req)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -180,7 +177,7 @@ func loginV1(authConfig *cliconfig.AuthConfig, registryEndpoint *Endpoint) (stri
// protected, so people can use `docker login` as an auth check.
req, err := http.NewRequest("GET", serverAddress+"users/", nil)
req.SetBasicAuth(authConfig.Username, authConfig.Password)
resp, err := client.Do(req)
resp, err := registryEndpoint.client.Do(req)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -217,17 +214,16 @@ func loginV2(authConfig *cliconfig.AuthConfig, registryEndpoint *Endpoint) (stri
var (
err error
allErrors []error
client = registryEndpoint.HTTPClient()
)

for _, challenge := range registryEndpoint.AuthChallenges {
logrus.Debugf("trying %q auth challenge with params %s", challenge.Scheme, challenge.Parameters)

switch strings.ToLower(challenge.Scheme) {
case "basic":
err = tryV2BasicAuthLogin(authConfig, challenge.Parameters, registryEndpoint, client)
err = tryV2BasicAuthLogin(authConfig, challenge.Parameters, registryEndpoint)
case "bearer":
err = tryV2TokenAuthLogin(authConfig, challenge.Parameters, registryEndpoint, client)
err = tryV2TokenAuthLogin(authConfig, challenge.Parameters, registryEndpoint)
default:
// Unsupported challenge types are explicitly skipped.
err = fmt.Errorf("unsupported auth scheme: %q", challenge.Scheme)
Expand All @@ -245,15 +241,15 @@ func loginV2(authConfig *cliconfig.AuthConfig, registryEndpoint *Endpoint) (stri
return "", fmt.Errorf("no successful auth challenge for %s - errors: %s", registryEndpoint, allErrors)
}

func tryV2BasicAuthLogin(authConfig *cliconfig.AuthConfig, params map[string]string, registryEndpoint *Endpoint, client *http.Client) error {
func tryV2BasicAuthLogin(authConfig *cliconfig.AuthConfig, params map[string]string, registryEndpoint *Endpoint) error {
req, err := http.NewRequest("GET", registryEndpoint.Path(""), nil)
if err != nil {
return err
}

req.SetBasicAuth(authConfig.Username, authConfig.Password)

resp, err := client.Do(req)
resp, err := registryEndpoint.client.Do(req)
if err != nil {
return err
}
Expand All @@ -266,8 +262,8 @@ func tryV2BasicAuthLogin(authConfig *cliconfig.AuthConfig, params map[string]str
return nil
}

func tryV2TokenAuthLogin(authConfig *cliconfig.AuthConfig, params map[string]string, registryEndpoint *Endpoint, client *http.Client) error {
token, err := getToken(authConfig.Username, authConfig.Password, params, registryEndpoint, client)
func tryV2TokenAuthLogin(authConfig *cliconfig.AuthConfig, params map[string]string, registryEndpoint *Endpoint) error {
token, err := getToken(authConfig.Username, authConfig.Password, params, registryEndpoint)
if err != nil {
return err
}
Expand All @@ -279,7 +275,7 @@ func tryV2TokenAuthLogin(authConfig *cliconfig.AuthConfig, params map[string]str

req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token))

resp, err := client.Do(req)
resp, err := registryEndpoint.client.Do(req)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 73823e5

Please sign in to comment.