From 617b887209bd22ae5abcacbbb04622586a250e6d Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Thu, 1 May 2014 14:53:52 -0400 Subject: [PATCH] Make the client use a "transport" abstraction to talk to server Decouple client -> server from knowing about the particular details of how data flows back and forth from the server. Use a consistent Locator interface for referring to hosts that is generic across possible transports. --- cmd/deployment.go | 46 +++ cmd/executor.go | 63 ++-- cmd/executor_test.go | 135 +++++++++ cmd/{gear => }/flags.go | 4 +- cmd/flags_test.go | 13 + cmd/gear/commands.go | 284 +++++++++--------- cmd/job_response.go | 9 +- cmd/locator.go | 170 +++++++++++ cmd/remote.go | 277 ----------------- contrib/geard-image.service | 16 + deployment/deployment.go | 41 ++- deployment/deployment_test.go | 59 ++-- .../fixtures/mongo_deploy_existing.json | 15 +- deployment/instance.go | 66 ++-- deployment/links.go | 13 +- deployment/placement.go | 21 +- deployment/ports.go | 2 + git/cmd/cmd.go | 26 +- git/cmd/extend.go | 2 +- git/http/handlers.go | 14 +- http/extend.go | 9 + http/extension.go | 9 +- http/handlers.go | 16 +- http/job_response.go | 46 ++- http/remote.go | 118 ++++++-- http/server.go | 4 +- jobs/delete_container.go | 7 +- jobs/jobs.go | 3 +- jobs/link_containers.go | 5 + jobs/list_units.go | 11 +- ssh/cmd/cmd.go | 38 +-- ssh/cmd/extend.go | 3 +- ssh/http/handlers.go | 13 +- tests/integration_test.go | 6 +- transport/locator.go | 95 ++++++ transport/transport.go | 77 +++++ 36 files changed, 1062 insertions(+), 674 deletions(-) create mode 100644 cmd/deployment.go create mode 100644 cmd/executor_test.go rename cmd/{gear => }/flags.go (96%) create mode 100644 cmd/flags_test.go create mode 100644 cmd/locator.go delete mode 100644 cmd/remote.go create mode 100644 contrib/geard-image.service create mode 100644 http/extend.go create mode 100644 transport/locator.go create mode 100644 transport/transport.go diff --git a/cmd/deployment.go b/cmd/deployment.go new file mode 100644 index 00000000..218412ec --- /dev/null +++ b/cmd/deployment.go @@ -0,0 +1,46 @@ +package cmd + +import ( + "errors" + "fmt" + "github.com/openshift/geard/deployment" + "github.com/openshift/geard/transport" +) + +// Return a set of container locators from the specified deployment +// descriptor. +func ExtractContainerLocatorsFromDeployment(t transport.Transport, path string, args *[]string) error { + if path == "" { + return nil + } + deployment, err := deployment.NewDeploymentFromFile(path) + if err != nil { + return err + } + locators, err := LocatorsForDeploymentInstances(t, deployment.Instances.References()) + if err != nil { + return err + } + if len(locators) == 0 { + return errors.New(fmt.Sprintf("There are no deployed instances listed in %s", path)) + } + for i := range locators { + *args = append(*args, locators[i].Identity()) + } + return nil +} + +func LocatorsForDeploymentInstances(t transport.Transport, instances deployment.InstanceRefs) (Locators, error) { + locators := make(Locators, 0, len(instances)) + for _, instance := range instances { + if instance.On != nil { + locator, err := t.LocatorFor(*instance.On) + if err != nil { + return Locators{}, err + } + resource := &ResourceLocator{ResourceTypeContainer, string(instance.Id), locator} + locators = append(locators, resource) + } + } + return locators, nil +} diff --git a/cmd/executor.go b/cmd/executor.go index 82e57db1..5d9d786b 100644 --- a/cmd/executor.go +++ b/cmd/executor.go @@ -3,14 +3,13 @@ package cmd import ( "errors" "fmt" - "github.com/openshift/geard/http" "github.com/openshift/geard/jobs" "github.com/openshift/geard/pkg/logstreamer" + "github.com/openshift/geard/transport" "io" "io/ioutil" "log" "os" - "reflect" "strings" "sync" ) @@ -27,7 +26,7 @@ type FuncReact func(*CliJobResponse, io.Writer, interface{}) // parallel or sequentially. You must set either .Group // or .Serial type Executor struct { - On []Locator + On Locators // Given a set of locators on the same server, create one // job that represents all ids. Group FuncBulk @@ -42,6 +41,9 @@ type Executor struct { OnSuccess FuncReact // Optional: respond to errors when they occur OnFailure FuncReact + // Optional: a way to transport a job to a remote server. If not + // specified remote locators will fail + Transport transport.Transport } // Invoke the appropriate job on each server and return the set of data @@ -106,7 +108,7 @@ func (e Executor) StreamAndExit() { func (e *Executor) run(gather bool) ([]*CliJobResponse, error) { on := e.On - local, remote := Locators(on).Group() + local, remote := on.Group() single := len(on) == 1 responses := []*CliJobResponse{} @@ -115,15 +117,20 @@ func (e *Executor) run(gather bool) ([]*CliJobResponse, error) { if err := localJobs.check(); err != nil { return responses, err } - remoteJobs := make([]remoteJobSet, len(remote)) + remoteJobs := make([][]remoteJob, len(remote)) for i := range remote { - jobs := e.jobs(remote[i]) + locator := remote[i] + jobs := e.jobs(locator) if err := jobs.check(); err != nil { return responses, err } - remotes, err := jobs.remotes() - if err != nil { - return responses, err + remotes := make([]remoteJob, len(jobs)) + for j := range jobs { + remote, err := e.Transport.RemoteJobFor(locator[0].TransportLocator(), jobs[j]) + if err != nil { + return responses, err + } + remotes[j] = remoteJob{remote, jobs[j], locator[0]} } remoteJobs[i] = remotes } @@ -155,30 +162,22 @@ func (e *Executor) run(gather bool) ([]*CliJobResponse, error) { }() } - // Executes jobs against each remote server in parallel + // Executes jobs against each remote server in parallel (could parallel to each server if necessary) for i := range remote { ids := remote[i] allJobs := remoteJobs[i] - host := ids[0].HostIdentity() - locator := ids[0].(http.RemoteLocator) + host := ids[0].TransportLocator() tasks.Add(1) go func() { - w := logstreamer.NewLogstreamer(stdout, prefixUnless(host+" ", single), false) - logger := log.New(w, "", 0) + w := logstreamer.NewLogstreamer(stdout, prefixUnless(host.String()+" ", single), false) defer w.Close() defer tasks.Done() - dispatcher := http.NewHttpDispatcher(locator, logger) for _, job := range allJobs { response := &CliJobResponse{Output: w, Gather: gather} - if err := dispatcher.Dispatch(job, response); err != nil { - // set an explicit error - response = &CliJobResponse{ - Error: jobs.SimpleJobError{jobs.JobResponseError, fmt.Sprintf("The server did not respond correctly: %s", err.Error())}, - } - } - respch <- e.react(response, w, job) + job.Execute(response) + respch <- e.react(response, w, job.Original) } }() } @@ -226,7 +225,11 @@ func (e *Executor) jobs(on []Locator) jobSet { } type jobSet []jobs.Job -type remoteJobSet []http.RemoteExecutable +type remoteJob struct { + jobs.Job + Original jobs.Job + Locator Locator +} func (jobs jobSet) check() error { for i := range jobs { @@ -240,20 +243,6 @@ func (jobs jobSet) check() error { return nil } -func (jobs jobSet) remotes() (remotes remoteJobSet, err error) { - remotes = make(remoteJobSet, 0, len(remotes)) - for i := range jobs { - job := jobs[i] - remotable, ok := job.(http.RemoteExecutable) - if !ok { - err = errors.New(fmt.Sprintf("Unable to run this action (%+v) against a remote server", reflect.TypeOf(job))) - return - } - remotes = append(remotes, remotable) - } - return -} - func Fail(code int, format string, other ...interface{}) { fmt.Fprintf(os.Stderr, format, other...) if !strings.HasSuffix(format, "\n") { diff --git a/cmd/executor_test.go b/cmd/executor_test.go new file mode 100644 index 00000000..ed4009ab --- /dev/null +++ b/cmd/executor_test.go @@ -0,0 +1,135 @@ +package cmd_test + +import ( + "fmt" + . "github.com/openshift/geard/cmd" + "github.com/openshift/geard/jobs" + "github.com/openshift/geard/transport" + "testing" +) + +type testLocator struct { + Locator string +} + +func (t *testLocator) String() string { + return t.Locator +} +func (t *testLocator) ResolveHostname() (string, error) { + return t.Locator, nil +} + +type testTransport struct { + GotLocator string + Translated map[string]jobs.Job + Invoked map[string]jobs.JobResponse +} + +func (t *testTransport) LocatorFor(locator string) (transport.Locator, error) { + t.GotLocator = locator + return &testLocator{locator}, nil +} +func (t *testTransport) RemoteJobFor(locator transport.Locator, job jobs.Job) (jobs.Job, error) { + if t.Translated == nil { + t.Translated = make(map[string]jobs.Job) + t.Invoked = make(map[string]jobs.JobResponse) + } + t.Translated[locator.String()] = job + invoked := func(res jobs.JobResponse) { + if _, found := t.Invoked[locator.String()]; found { + panic(fmt.Sprintf("Same job %+v invoked twice under %s", job, locator.String())) + } + t.Invoked[locator.String()] = res + res.Success(jobs.JobResponseOk) + } + return jobs.JobFunction(invoked), nil +} + +func TestShouldSendRemoteJob(t *testing.T) { + trans := &testTransport{} + localhost := &testLocator{"localhost"} + initCalled := false + locator := &ResourceLocator{ResourceTypeContainer, "foobar", localhost} + + Executor{ + On: Locators{locator}, + Serial: func(on Locator) jobs.Job { + if on != locator { + t.Fatalf("Expected locator passed to Serial() to be identical to %+v", locator) + } + return &jobs.StoppedContainerStateRequest{ + Id: AsIdentifier(on), + } + }, + LocalInit: func() error { + initCalled = true + return nil + }, + Transport: trans, + }.Gather() + + if initCalled { + t.Errorf("Local initialization should be bypassed for remote transports") + } + if _, ok := trans.Translated["localhost"]; !ok { + t.Errorf("Job for localhost was not enqueued in %+v", trans.Invoked) + } + if _, ok := trans.Invoked["localhost"]; !ok { + t.Errorf("Job for localhost was not enqueued in %+v", trans.Invoked) + } +} + +func TestShouldExtractLocators(t *testing.T) { + trans := &testTransport{} + args := []string{} + err := ExtractContainerLocatorsFromDeployment(trans, "../deployment/fixtures/mongo_deploy_existing.json", &args) + if err != nil { + t.Fatalf("Expected no error from extract: %+v", err) + } + if len(args) != 3 { + t.Fatalf("Expected args to have 3 locators, not %d", len(args)) + } +} + +func TestShouldConvertLocator(t *testing.T) { + locator := &ResourceLocator{ResourceTypeContainer, "foobar", &testLocator{"localhost"}} + id := AsIdentifier(locator) + if id == "" { + t.Errorf("Locator should not have error on converting to identifier") + } +} + +func TestShouldReadIdentifiersFromArgs(t *testing.T) { + ids, err := NewResourceLocators(&testTransport{}, ResourceTypeContainer, "ctr://localhost/foo", "bart", "ctr://local/bazi") + if err != nil { + t.Errorf("No error should occur reading locators: %s", err.Error()) + } + if len(ids) != 3 { + t.Errorf("Should have received 3 ids: %+v", ids) + } + if string(AsIdentifier(ids[0])) != "" { + t.Error("First id should have value '' because foo is too short", ids[0]) + } + if string(AsIdentifier(ids[1])) != "bart" { + t.Error("Second id should have value 'bart'", ids[1]) + } + if string(AsIdentifier(ids[2])) != "bazi" { + t.Error("Third id should have value 'bazi'", ids[2]) + } +} + +func TestShoulCheckContainerArgsArgs(t *testing.T) { + ids, err := NewContainerLocators(&testTransport{}, "ctr://localhost/foo") + if err == nil { + t.Errorf("This locator should be invalid: %s", ids[0]) + } + ids, err = NewContainerLocators(&testTransport{}, "bar") + if err == nil { + t.Errorf("This locator should be invalid: %s", ids[0]) + } + ids, err = NewContainerLocators(&testTransport{}, "ctr://local/baz") + if err == nil { + t.Errorf("This locator should be invalid: %s", ids[0]) + } + +} diff --git a/cmd/gear/flags.go b/cmd/flags.go similarity index 96% rename from cmd/gear/flags.go rename to cmd/flags.go index 4681023c..a5dc6f09 100644 --- a/cmd/gear/flags.go +++ b/cmd/flags.go @@ -1,4 +1,4 @@ -package main +package cmd import ( "crypto/rand" @@ -89,7 +89,7 @@ func (e *EnvironmentDescription) ExtractVariablesFrom(args *[]string, generateId } env, err := containers.ExtractEnvironmentVariablesFrom(args) if err != nil { - log.Printf("Failed to extract env") + fmt.Fprintln(os.Stderr, "Failed to extract env: "+err.Error()) return err } e.Description.Variables = append(e.Description.Variables, env...) diff --git a/cmd/flags_test.go b/cmd/flags_test.go new file mode 100644 index 00000000..def7d7ff --- /dev/null +++ b/cmd/flags_test.go @@ -0,0 +1,13 @@ +package cmd_test + +import ( + . "github.com/openshift/geard/cmd" + "testing" +) + +func TestGenerateId(t *testing.T) { + s := GenerateId() + if s == "" { + t.Error("Expected generated ID to be non empty") + } +} diff --git a/cmd/gear/commands.go b/cmd/gear/commands.go index 175cec45..815c2f85 100644 --- a/cmd/gear/commands.go +++ b/cmd/gear/commands.go @@ -10,10 +10,10 @@ import ( "os" "path/filepath" "regexp" - "strings" "time" "github.com/openshift/docker-source-to-images/go" + "github.com/openshift/geard/cleanup" . "github.com/openshift/geard/cmd" "github.com/openshift/geard/config" "github.com/openshift/geard/containers" @@ -24,8 +24,8 @@ import ( "github.com/openshift/geard/jobs" "github.com/openshift/geard/port" "github.com/openshift/geard/systemd" + "github.com/openshift/geard/transport" "github.com/spf13/cobra" - "github.com/openshift/geard/cleanup" ) var ( @@ -61,6 +61,8 @@ var ( dryRun bool repair bool + + defaultTransport transport.TransportFlag ) var conf = http.HttpConfiguration{ @@ -78,6 +80,10 @@ var ( needsData = LocalInitializers(containers.InitializeData) ) +func init() { + defaultTransport.Set("http") +} + // Parse the command line arguments and invoke one of the support subcommands. func Execute() { gearCmd := &cobra.Command{ @@ -91,6 +97,7 @@ func Execute() { gearCmd.PersistentFlags().BoolVar(&(config.SystemDockerFeatures.EnvironmentFile), "has-env-file", false, "(experimental) Use --env-file with Docker, requires master from Apr 1st") gearCmd.PersistentFlags().BoolVar(&(config.SystemDockerFeatures.ForegroundRun), "has-foreground", false, "(experimental) Use --foreground with Docker, requires alexlarsson/forking-run") gearCmd.PersistentFlags().StringVar(&deploymentPath, "with", "", "Provide a deployment descriptor to operate on") + gearCmd.PersistentFlags().Var(&defaultTransport, "transport", "Specify an alternate mechanism to connect to the gear agent") deployCmd := &cobra.Command{ Use: "deploy ...", @@ -288,11 +295,11 @@ func deployContainers(cmd *cobra.Command, args []string) { Fail(1, "Unable to load deployment file: %s", err.Error()) } if len(args) == 1 { - args = append(args, LocalHostName) + args = append(args, transport.Local.String()) } - servers, err := NewHostLocators(args[1:]...) + servers, err := transport.NewTransportLocators(defaultTransport.Get(), args[1:]...) if err != nil { - Fail(1, "You must pass zero or more valid host names (use '%s' or pass no arguments for the current server): %s\n", LocalHostName, err.Error()) + Fail(1, "You must pass zero or more valid host names (use '%s' or pass no arguments for the current server): %s", transport.Local.String(), err.Error()) } re := regexp.MustCompile("\\.\\d{8}\\-\\d{6}\\z") @@ -301,21 +308,24 @@ func deployContainers(cmd *cobra.Command, args []string) { base = re.ReplaceAllString(base, "") newPath := base + now - log.Printf("Deploying %s", newPath) - changes, removed, err := deploy.Describe(deployment.SimplePlacement(servers)) + fmt.Printf("Deploying %s", newPath) + changes, removed, err := deploy.Describe(deployment.SimplePlacement(servers), defaultTransport.Get()) if err != nil { Fail(1, "Deployment is not valid: %s", err.Error()) } if len(removed) > 0 { + removedIds, err := LocatorsForDeploymentInstances(defaultTransport.Get(), removed) + if err != nil { + Fail(1, "Unable to generate deployment info: %s", err.Error()) + } + failures := Executor{ - On: removed.Ids(), + On: removedIds, Serial: func(on Locator) jobs.Job { - return &http.HttpDeleteContainerRequest{ + return &jobs.DeleteContainerRequest{ + Id: AsIdentifier(on), Label: on.Identity(), - DeleteContainerRequest: jobs.DeleteContainerRequest{ - Id: on.(ResourceLocator).Identifier(), - }, } }, Output: os.Stdout, @@ -323,34 +333,36 @@ func deployContainers(cmd *cobra.Command, args []string) { fmt.Fprintf(w, "Deleted %s", job.(jobs.LabeledJob).JobLabel()) }, LocalInit: needsSystemdAndData, + Transport: defaultTransport.Get(), }.Stream() for i := range failures { fmt.Fprintf(os.Stderr, failures[i].Error()) } } - addedIds := changes.Instances.AddedIds() + addedIds, err := LocatorsForDeploymentInstances(defaultTransport.Get(), changes.Instances.Added()) + if err != nil { + Fail(1, "Unable to generate deployment info: %s", err.Error()) + } errors := Executor{ On: addedIds, Serial: func(on Locator) jobs.Job { - instance, _ := changes.Instances.Find(on.(ResourceLocator).Identifier()) + instance, _ := changes.Instances.Find(AsIdentifier(on)) links := instance.NetworkLinks() - return &http.HttpInstallContainerRequest{ - InstallContainerRequest: jobs.InstallContainerRequest{ - RequestIdentifier: jobs.NewRequestIdentifier(), + return &jobs.InstallContainerRequest{ + RequestIdentifier: jobs.NewRequestIdentifier(), - Id: instance.Id, - Image: instance.Image, - Isolate: isolate, + Id: instance.Id, + Image: instance.Image, + Isolate: isolate, - Ports: instance.Ports.PortPairs(), - NetworkLinks: &links, - }, + Ports: instance.Ports.PortPairs(), + NetworkLinks: &links, } }, OnSuccess: func(r *CliJobResponse, w io.Writer, job interface{}) { - installJob := job.(*http.HttpInstallContainerRequest) + installJob := job.(*jobs.InstallContainerRequest) instance, _ := changes.Instances.Find(installJob.Id) if pairs, ok := installJob.PortMappingsFrom(r.Pending); ok { if !instance.Ports.Update(pairs) { @@ -360,6 +372,7 @@ func deployContainers(cmd *cobra.Command, args []string) { }, Output: os.Stdout, LocalInit: needsSystemdAndData, + Transport: defaultTransport.Get(), }.Stream() changes.UpdateLinks() @@ -378,38 +391,41 @@ func deployContainers(cmd *cobra.Command, args []string) { fmt.Fprintf(os.Stderr, "Unable to write %s: %s\n", newPath, err.Error()) } + linkedIds, err := LocatorsForDeploymentInstances(defaultTransport.Get(), changes.Instances.Linked()) + if err != nil { + Fail(1, "Unable to generate deployment info: %s", err.Error()) + } + Executor{ - On: changes.Instances.LinkedIds(), + On: linkedIds, Group: func(on ...Locator) jobs.Job { links := []jobs.ContainerLink{} for i := range on { - instance, _ := changes.Instances.Find(on[i].(ResourceLocator).Identifier()) + instance, _ := changes.Instances.Find(AsIdentifier(on[i])) network := instance.NetworkLinks() if len(network) > 0 { links = append(links, jobs.ContainerLink{instance.Id, network}) } } - return &http.HttpLinkContainersRequest{ - Label: on[0].HostIdentity(), - LinkContainersRequest: jobs.LinkContainersRequest{&jobs.ContainerLinks{links}}, - } + + return &jobs.LinkContainersRequest{&jobs.ContainerLinks{links}, on[0].TransportLocator().String()} }, Output: os.Stdout, OnSuccess: func(r *CliJobResponse, w io.Writer, job interface{}) { fmt.Fprintf(w, "Links set on %s\n", job.(jobs.LabeledJob).JobLabel()) }, + Transport: defaultTransport.Get(), }.Stream() Executor{ On: addedIds, Serial: func(on Locator) jobs.Job { - return &http.HttpStartContainerRequest{ - StartedContainerStateRequest: jobs.StartedContainerStateRequest{ - Id: on.(ResourceLocator).Identifier(), - }, + return &jobs.StartedContainerStateRequest{ + Id: AsIdentifier(on), } }, - Output: os.Stdout, + Output: os.Stdout, + Transport: defaultTransport.Get(), }.Stream() if len(errors) > 0 { @@ -426,46 +442,45 @@ func installImage(cmd *cobra.Command, args []string) { } if len(args) < 2 { - Fail(1, "Valid arguments: ...\n") + Fail(1, "Valid arguments: ...") } imageId := args[0] if imageId == "" { - Fail(1, "Argument 1 must be a Docker image to base the service on\n") + Fail(1, "Argument 1 must be a Docker image to base the service on") } - ids, err := NewContainerLocators(args[1:]...) + ids, err := NewContainerLocators(defaultTransport.Get(), args[1:]...) if err != nil { - Fail(1, "You must pass one or more valid service names: %s\n", err.Error()) + Fail(1, "You must pass one or more valid service names: %s", err.Error()) } - suffix := "/" + imageId for _, locator := range ids { - if strings.HasSuffix(locator.Identity(), suffix) { - Fail(1, "Image name and container id must not be the same: %s\n", imageId) + if imageId == string(AsIdentifier(locator)) { + Fail(1, "Image name and container id must not be the same: %s", imageId) } } Executor{ On: ids, Serial: func(on Locator) jobs.Job { - return &http.HttpInstallContainerRequest{ - InstallContainerRequest: jobs.InstallContainerRequest{ - RequestIdentifier: jobs.NewRequestIdentifier(), - - Id: on.(ResourceLocator).Identifier(), - Image: imageId, - Started: start, - Isolate: isolate, - SocketActivation: sockAct, - - Ports: *portPairs.Get().(*port.PortPairs), - Environment: &environment.Description, - NetworkLinks: networkLinks.NetworkLinks, - }, + r := jobs.InstallContainerRequest{ + RequestIdentifier: jobs.NewRequestIdentifier(), + + Id: AsIdentifier(on), + Image: imageId, + Started: start, + Isolate: isolate, + SocketActivation: sockAct, + + Ports: *portPairs.Get().(*port.PortPairs), + Environment: &environment.Description, + NetworkLinks: networkLinks.NetworkLinks, } + return &r }, Output: os.Stdout, LocalInit: needsSystemdAndData, + Transport: defaultTransport.Get(), }.StreamAndExit() } @@ -512,53 +527,50 @@ func setEnvironment(cmd *cobra.Command, args []string) { } if len(args) < 1 { - Fail(1, "Valid arguments: ... =...\n") + Fail(1, "Valid arguments: ... =...") } - ids, err := NewContainerLocators(args[0:]...) + ids, err := NewContainerLocators(defaultTransport.Get(), args...) if err != nil { - Fail(1, "You must pass one or more valid service names: %s\n", err.Error()) + Fail(1, "You must pass one or more valid service names: %s", err.Error()) } Executor{ On: ids, Serial: func(on Locator) jobs.Job { - environment.Description.Id = on.(ResourceLocator).Identifier() + environment.Description.Id = AsIdentifier(on) if resetEnv { - return &http.HttpPutEnvironmentRequest{ - PutEnvironmentRequest: jobs.PutEnvironmentRequest{environment.Description}, - } - } - return &http.HttpPatchEnvironmentRequest{ - PatchEnvironmentRequest: jobs.PatchEnvironmentRequest{environment.Description}, + return &jobs.PutEnvironmentRequest{environment.Description} } + + return &jobs.PatchEnvironmentRequest{environment.Description} }, Output: os.Stdout, LocalInit: needsSystemdAndData, + Transport: defaultTransport.Get(), }.StreamAndExit() } func showEnvironment(cmd *cobra.Command, args []string) { if len(args) < 1 { - Fail(1, "Valid arguments: ...\n") + Fail(1, "Valid arguments: ...") } - ids, err := NewContainerLocators(args...) + ids, err := NewContainerLocators(defaultTransport.Get(), args[0:]...) if err != nil { - Fail(1, "You must pass one or more valid environment ids: %s\n", err.Error()) + Fail(1, "You must pass one or more valid environment ids: %s", err.Error()) } data, errors := Executor{ On: ids, Serial: func(on Locator) jobs.Job { - return &http.HttpContentRequest{ - ContentRequest: jobs.ContentRequest{ - Locator: string(on.(ResourceLocator).Identifier()), - Type: jobs.ContentTypeEnvironment, - }, + return &jobs.ContentRequest{ + Locator: string(AsIdentifier(on)), + Type: jobs.ContentTypeEnvironment, } }, LocalInit: needsData, Output: os.Stdout, + Transport: defaultTransport.Get(), }.Gather() for i := range data { @@ -576,42 +588,43 @@ func showEnvironment(cmd *cobra.Command, args []string) { } func deleteContainer(cmd *cobra.Command, args []string) { - if err := deployment.ExtractContainerLocatorsFromDeployment(deploymentPath, &args); err != nil { + if err := ExtractContainerLocatorsFromDeployment(defaultTransport.Get(), deploymentPath, &args); err != nil { Fail(1, err.Error()) } + if len(args) < 1 { - Fail(1, "Valid arguments: ...\n") + Fail(1, "Valid arguments: ...") } - ids, err := NewContainerLocators(args...) + + ids, err := NewContainerLocators(defaultTransport.Get(), args...) if err != nil { - Fail(1, "You must pass one or more valid service names: %s\n", err.Error()) + Fail(1, "You must pass one or more valid service names: %s", err.Error()) } Executor{ On: ids, Serial: func(on Locator) jobs.Job { - return &http.HttpDeleteContainerRequest{ + return &jobs.DeleteContainerRequest{ + Id: AsIdentifier(on), Label: on.Identity(), - DeleteContainerRequest: jobs.DeleteContainerRequest{ - Id: on.(ResourceLocator).Identifier(), - }, } }, Output: os.Stdout, OnSuccess: func(r *CliJobResponse, w io.Writer, job interface{}) { - fmt.Fprintf(w, "Deleted %s", job.(jobs.LabeledJob).JobLabel()) + fmt.Fprintf(w, "Deleted %s", string(job.(*jobs.DeleteContainerRequest).Id)) }, LocalInit: needsSystemdAndData, + Transport: defaultTransport.Get(), }.StreamAndExit() } func linkContainers(cmd *cobra.Command, args []string) { if len(args) < 1 { - Fail(1, "Valid arguments: ...\n") + Fail(1, "Valid arguments: ...") } - ids, err := NewContainerLocators(args...) + ids, err := NewContainerLocators(defaultTransport.Get(), args...) if err != nil { - Fail(1, "You must pass one or more valid service names: %s\n", err.Error()) + Fail(1, "You must pass one or more valid service names: %s", err.Error()) } if networkLinks.NetworkLinks == nil { networkLinks.NetworkLinks = &containers.NetworkLinks{} @@ -623,138 +636,120 @@ func linkContainers(cmd *cobra.Command, args []string) { links := &jobs.ContainerLinks{make([]jobs.ContainerLink, 0, len(on))} buf := bytes.Buffer{} for i := range on { - links.Links = append(links.Links, jobs.ContainerLink{on[i].(ResourceLocator).Identifier(), *networkLinks.NetworkLinks}) + links.Links = append(links.Links, jobs.ContainerLink{AsIdentifier(on[i]), *networkLinks.NetworkLinks}) if i > 0 { buf.WriteString(", ") } buf.WriteString(on[i].Identity()) } - return &http.HttpLinkContainersRequest{ - Label: buf.String(), - LinkContainersRequest: jobs.LinkContainersRequest{links}, - } + return &jobs.LinkContainersRequest{links, buf.String()} }, Output: os.Stdout, LocalInit: needsData, OnSuccess: func(r *CliJobResponse, w io.Writer, job interface{}) { - fmt.Fprintf(w, "Links set on %s\n", job.(*http.HttpLinkContainersRequest).Label) + fmt.Fprintf(w, "Links set on %s\n", job.(jobs.LabeledJob).JobLabel()) }, + Transport: defaultTransport.Get(), }.StreamAndExit() } func startContainer(cmd *cobra.Command, args []string) { - if err := deployment.ExtractContainerLocatorsFromDeployment(deploymentPath, &args); err != nil { + if err := ExtractContainerLocatorsFromDeployment(defaultTransport.Get(), deploymentPath, &args); err != nil { Fail(1, err.Error()) } if len(args) < 1 { - Fail(1, "Valid arguments: ...\n") + Fail(1, "Valid arguments: ...") } - ids, err := NewContainerLocators(args...) + ids, err := NewContainerLocators(defaultTransport.Get(), args...) if err != nil { - Fail(1, "You must pass one or more valid service names: %s\n", err.Error()) + Fail(1, "You must pass one or more valid service names: %s", err.Error()) } - if len(ids) == 1 && !ids[0].IsRemote() { - fmt.Fprintf(os.Stderr, "You can also control this container via 'systemctl start %s'\n", ids[0].(ResourceLocator).Identifier().UnitNameFor()) - } Executor{ On: ids, Serial: func(on Locator) jobs.Job { - return &http.HttpStartContainerRequest{ - StartedContainerStateRequest: jobs.StartedContainerStateRequest{ - Id: on.(ResourceLocator).Identifier(), - }, + return &jobs.StartedContainerStateRequest{ + Id: AsIdentifier(on), } }, Output: os.Stdout, LocalInit: needsSystemd, + Transport: defaultTransport.Get(), }.StreamAndExit() } func stopContainer(cmd *cobra.Command, args []string) { - if err := deployment.ExtractContainerLocatorsFromDeployment(deploymentPath, &args); err != nil { + if err := ExtractContainerLocatorsFromDeployment(defaultTransport.Get(), deploymentPath, &args); err != nil { Fail(1, err.Error()) } if len(args) < 1 { - Fail(1, "Valid arguments: ...\n") + Fail(1, "Valid arguments: ...") } - ids, err := NewContainerLocators(args...) + ids, err := NewContainerLocators(defaultTransport.Get(), args...) if err != nil { - Fail(1, "You must pass one or more valid service names: %s\n", err.Error()) + Fail(1, "You must pass one or more valid service names: %s", err.Error()) } - if len(ids) == 1 && !ids[0].IsRemote() { - fmt.Fprintf(os.Stderr, "You can also control this container via 'systemctl stop %s'\n", ids[0].(ResourceLocator).Identifier().UnitNameFor()) - } Executor{ On: ids, Serial: func(on Locator) jobs.Job { - return &http.HttpStopContainerRequest{ - StoppedContainerStateRequest: jobs.StoppedContainerStateRequest{ - Id: on.(ResourceLocator).Identifier(), - }, + return &jobs.StoppedContainerStateRequest{ + Id: AsIdentifier(on), } }, Output: os.Stdout, LocalInit: needsSystemd, + Transport: defaultTransport.Get(), }.StreamAndExit() } func restartContainer(cmd *cobra.Command, args []string) { - if err := deployment.ExtractContainerLocatorsFromDeployment(deploymentPath, &args); err != nil { + if err := ExtractContainerLocatorsFromDeployment(defaultTransport.Get(), deploymentPath, &args); err != nil { Fail(1, err.Error()) } if len(args) < 1 { - Fail(1, "Valid arguments: ...\n") + Fail(1, "Valid arguments: ...") } - ids, err := NewContainerLocators(args...) + ids, err := NewContainerLocators(defaultTransport.Get(), args...) if err != nil { - Fail(1, "You must pass one or more valid service names: %s\n", err.Error()) + Fail(1, "You must pass one or more valid service names: %s", err.Error()) } - if len(ids) == 1 && !ids[0].IsRemote() { - fmt.Fprintf(os.Stderr, "You can also control this container via 'systemctl restart %s'\n", ids[0].(ResourceLocator).Identifier().UnitNameFor()) - } Executor{ On: ids, Serial: func(on Locator) jobs.Job { - return &http.HttpRestartContainerRequest{ - RestartContainerRequest: jobs.RestartContainerRequest{ - Id: on.(ResourceLocator).Identifier(), - }, + return &jobs.RestartContainerRequest{ + Id: AsIdentifier(on), } }, Output: os.Stdout, LocalInit: needsSystemd, + Transport: defaultTransport.Get(), }.StreamAndExit() } func containerStatus(cmd *cobra.Command, args []string) { - if err := deployment.ExtractContainerLocatorsFromDeployment(deploymentPath, &args); err != nil { + if err := ExtractContainerLocatorsFromDeployment(defaultTransport.Get(), deploymentPath, &args); err != nil { Fail(1, err.Error()) } if len(args) < 1 { - Fail(1, "Valid arguments: ...\n") + Fail(1, "Valid arguments: ...") } - ids, err := NewContainerLocators(args...) + ids, err := NewContainerLocators(defaultTransport.Get(), args...) if err != nil { - Fail(1, "You must pass one or more valid service names: %s\n", err.Error()) + Fail(1, "You must pass one or more valid service names: %s", err.Error()) } - if len(ids) == 1 && !ids[0].IsRemote() { - fmt.Fprintf(os.Stderr, "You can also display the status of this container via 'systemctl status %s'\n", ids[0].(ResourceLocator).Identifier().UnitNameFor()) - } data, errors := Executor{ On: ids, Serial: func(on Locator) jobs.Job { - return &http.HttpContainerStatusRequest{ - ContainerStatusRequest: jobs.ContainerStatusRequest{ - Id: on.(ResourceLocator).Identifier(), - }, + return &jobs.ContainerStatusRequest{ + Id: AsIdentifier(on), } }, Output: os.Stdout, LocalInit: needsSystemd, + Transport: defaultTransport.Get(), }.Gather() for i := range data { @@ -776,26 +771,21 @@ func containerStatus(cmd *cobra.Command, args []string) { func listUnits(cmd *cobra.Command, args []string) { if len(args) == 0 { - args = []string{LocalHostName} + args = []string{transport.Local.String()} } - ids, err := NewHostLocators(args...) + servers, err := NewHostLocators(defaultTransport.Get(), args[0:]...) if err != nil { - Fail(1, "You must pass zero or more valid host names (use '%s' or pass no arguments for the current server): %s\n", LocalHostName, err.Error()) + Fail(1, "You must pass zero or more valid host names (use '%s' or pass no arguments for the current server): %s", transport.Local.String(), err.Error()) } - if len(ids) == 1 && !ids[0].IsRemote() { - fmt.Fprintf(os.Stderr, "You can also display the set of containers via 'systemctl list-units'\n") - } data, errors := Executor{ - On: ids, + On: servers, Group: func(on ...Locator) jobs.Job { - return &http.HttpListContainersRequest{ - Label: string(on[0].HostIdentity()), - ListContainersRequest: jobs.ListContainersRequest{}, - } + return &jobs.ListContainersRequest{on[0].TransportLocator().String()} }, Output: os.Stdout, LocalInit: needsSystemd, + Transport: defaultTransport.Get(), }.Gather() combined := http.ListContainersResponse{} diff --git a/cmd/job_response.go b/cmd/job_response.go index fb50e263..e8576d0a 100644 --- a/cmd/job_response.go +++ b/cmd/job_response.go @@ -21,7 +21,7 @@ type CliJobResponse struct { // Data gathered from the response Data interface{} // The error set on the response - Error jobs.JobError + Error error succeeded bool failed bool @@ -70,11 +70,6 @@ func (s *CliJobResponse) SuccessWithWrite(t jobs.JobResponseSuccess, flush, stru return utils.NewWriteFlusher(s.Output) } -func (s *CliJobResponse) WriteClosed() <-chan bool { - ch := make(chan bool) - return ch -} - func (s *CliJobResponse) WritePendingSuccess(name string, value interface{}) { if s.Pending == nil { s.Pending = make(map[string]interface{}) @@ -82,7 +77,7 @@ func (s *CliJobResponse) WritePendingSuccess(name string, value interface{}) { s.Pending[name] = value } -func (s *CliJobResponse) Failure(e jobs.JobError) { +func (s *CliJobResponse) Failure(e error) { if s.succeeded { panic("May not invoke failure after Success()") } diff --git a/cmd/locator.go b/cmd/locator.go new file mode 100644 index 00000000..7bd3e8cf --- /dev/null +++ b/cmd/locator.go @@ -0,0 +1,170 @@ +package cmd + +import ( + "errors" + "github.com/openshift/geard/containers" + "github.com/openshift/geard/transport" + "strings" +) + +type Locator interface { + // The location of this resource + TransportLocator() transport.Locator + // Two resources are identical if they have the same + // identity value. + Identity() string +} +type Locators []Locator + +// Group resource locators by their transport location +func (locators Locators) Group() (local Locators, remote []Locators) { + local = make(Locators, 0, len(locators)) + groups := make(map[string]Locators) + for i := range locators { + locator := locators[i].TransportLocator() + if locator != transport.Local { + remotes, ok := groups[locator.String()] + if !ok { + remotes = make(Locators, 0, 2) + } + groups[locator.String()] = append(remotes, locators[i]) + } else { + local = append(local, locators[i]) + } + } + remote = make([]Locators, 0, len(groups)) + for k := range groups { + remotes := groups[k] + remote = append(remote, remotes) + } + return +} + +/* +func (locators Locators) Has(locator Locator) bool { + for i := range locators { + if locators[i].Identity() == locator.Identity() { + return true + } + } + return false +} +*/ + +// Disambiguate resources with the same id via their type +type ResourceType string + +// A container resource +const ResourceTypeContainer ResourceType = "ctr" + +type ResourceValidator interface { + Type() ResourceType +} + +// The a resource on a server reachable via a transport. +type ResourceLocator struct { + // The type of resource being referenced + Type ResourceType + // The identifier of the resource at this location + Id string + // A label representing a server (varies by transport) + At transport.Locator +} +type ResourceLocators []ResourceLocator + +func (r *ResourceLocator) TransportLocator() transport.Locator { + return r.At +} +func (r *ResourceLocator) Identity() string { + base := r.Id + if r.At != transport.Local { + base = r.At.String() + "/" + base + } + if r.Type != "" { + base = string(r.Type) + "://" + base + } + return base +} +func AsIdentifier(locator Locator) containers.Identifier { + id, _ := containers.NewIdentifier(locator.(*ResourceLocator).Id) + return id +} + +func NewResourceLocators(t transport.Transport, defaultType ResourceType, values ...string) (Locators, error) { + out := make(Locators, 0, len(values)) + for i := range values { + r, err := NewResourceLocator(t, defaultType, values[i]) + if err != nil { + return out, err + } + out = append(out, r) + } + return out, nil +} +func NewResourceLocator(t transport.Transport, defaultType ResourceType, value string) (Locator, error) { + res, host, id, errs := SplitTypeHostSuffix(value) + if errs != nil { + return nil, errs + } + if res == "" { + res = defaultType + } + locator, err := t.LocatorFor(host) + if err != nil { + return nil, err + } + return &ResourceLocator{ResourceType(res), id, locator}, nil +} + +func NewContainerLocators(t transport.Transport, values ...string) (Locators, error) { + locators, err := NewResourceLocators(t, ResourceTypeContainer, values...) + if err != nil { + return Locators{}, err + } + for i := range locators { + _, err := containers.NewIdentifier(locators[i].(*ResourceLocator).Id) + if err != nil { + return Locators{}, err + } + } + return locators, nil +} + +// Given a command line string representing a resource, break it into type, host identity, and suffix +func SplitTypeHostSuffix(value string) (res ResourceType, host string, suffix string, err error) { + if value == "" { + err = errors.New("The identifier must be specified as / or ") + return + } + + locatorParts := strings.SplitN(value, "://", 2) + if len(locatorParts) == 2 { + res = ResourceType(locatorParts[0]) + value = locatorParts[1] + } + + sections := strings.SplitN(value, "/", 2) + if len(sections) == 1 { + suffix = sections[0] + return + } + if strings.TrimSpace(sections[0]) == "" { + err = errors.New("You must specify / or ") + return + } + host = sections[0] + suffix = sections[1] + return +} + +func NewHostLocators(t transport.Transport, values ...string) (Locators, error) { + out := make(Locators, 0, len(values)) + for i := range values { + r, err := t.LocatorFor(values[i]) + if err != nil { + return out, err + } + out = append(out, &ResourceLocator{"", "", r}) + } + return out, nil +} diff --git a/cmd/remote.go b/cmd/remote.go deleted file mode 100644 index e71276a7..00000000 --- a/cmd/remote.go +++ /dev/null @@ -1,277 +0,0 @@ -package cmd - -import ( - "errors" - "fmt" - "github.com/openshift/geard/containers" - "github.com/openshift/geard/port" - "log" - "net" - "net/url" - "strconv" - "strings" -) - -const ( - LocalHostName = "local" - ResourceTypeContainer ResourceType = "ctr" -) - -type ResourceType string - -type Locator interface { - ResourceType() ResourceType - IsRemote() bool - Identity() string - HostIdentity() string -} -type Locators []Locator -type ResourceLocator interface { - Identifier() containers.Identifier -} - -func LocatorsAreEqual(a, b Locator) bool { - return a.Identity() == b.Identity() -} - -func (locators Locators) Has(locator Locator) bool { - for i := range locators { - if locators[i].Identity() == locator.Identity() { - return true - } - } - return false -} - -func (locators Locators) Group() (local Locators, remote []Locators) { - local = make(Locators, 0, len(locators)) - groups := make(map[string]Locators) - for i := range locators { - locator := locators[i] - if locator.IsRemote() { - remotes, ok := groups[locator.HostIdentity()] - if !ok { - remotes = make(Locators, 0, 2) - } - groups[locator.HostIdentity()] = append(remotes, locator) - } else { - local = append(local, locator) - } - } - remote = make([]Locators, 0, len(groups)) - for k := range groups { - remotes := groups[k] - remote = append(remote, remotes) - } - return -} - -type HostLocator struct { - Host string - Port port.Port -} - -type ContainerLocator struct { - HostLocator - Id containers.Identifier -} - -type GenericLocator struct { - HostLocator - Id containers.Identifier - Type ResourceType -} - -func NewHostLocators(values ...string) (Locators, error) { - out := make(Locators, 0, len(values)) - for i := range values { - r, err := NewHostLocator(values[i]) - if err != nil { - return out, err - } - out = append(out, r) - } - return out, nil -} - -func NewHostLocator(value string) (*HostLocator, error) { - if strings.Contains(value, "/") { - return nil, errors.New("Host identifiers may not have a slash") - } - if value == "" || value == LocalHostName { - return &HostLocator{}, nil - } - - id := &HostLocator{} - if strings.Contains(value, ":") { - host, portString, err := net.SplitHostPort(value) - if err != nil { - return nil, err - } - if portString != "" { - p, err := strconv.Atoi(portString) - if err != nil { - return nil, err - } - id.Port = port.Port(p) - if err := id.Port.Check(); err != nil { - return nil, err - } - } - id.Host = host - } else { - id.Host = value - } - return id, nil -} - -func splitTypeHostId(value string) (res ResourceType, host string, id containers.Identifier, err error) { - if value == "" { - err = errors.New("The identifier must be specified as / or ") - return - } - - locatorParts := strings.SplitN(value, "://", 2) - if len(locatorParts) == 2 { - res = ResourceType(locatorParts[0]) - value = locatorParts[1] - } - - sections := strings.SplitN(value, "/", 2) - if len(sections) == 1 { - id, err = containers.NewIdentifier(sections[0]) - return - } - id, err = containers.NewIdentifier(sections[1]) - if err != nil { - return - } - if strings.TrimSpace(sections[0]) == "" { - err = errors.New("You must specify / or ") - return - } - host = sections[0] - return -} - -func NewContainerLocators(values ...string) (Locators, error) { - out := make(Locators, 0, len(values)) - for i := range values { - r, err := NewContainerLocator(values[i]) - if err != nil { - return out, err - } - out = append(out, r) - } - return out, nil -} - -func NewContainerLocator(value string) (*ContainerLocator, error) { - res, hostString, id, errs := splitTypeHostId(value) - if errs != nil { - return nil, errs - } - if res != "" && ResourceType(res) != ResourceTypeContainer { - return nil, errors.New(fmt.Sprintf("%s is not a container", value)) - } - host, err := NewHostLocator(hostString) - if err != nil { - return nil, err - } - return &ContainerLocator{*host, id}, nil -} - -func NewGenericLocators(defaultType ResourceType, values ...string) (Locators, error) { - out := make(Locators, 0, len(values)) - for i := range values { - r, err := NewGenericLocator(defaultType, values[i]) - if err != nil { - return out, err - } - out = append(out, r) - } - return out, nil -} - -func NewGenericLocator(defaultType ResourceType, value string) (Locator, error) { - res, hostString, id, errs := splitTypeHostId(value) - if errs != nil { - return nil, errs - } - if res == "" { - res = defaultType - } - host, err := NewHostLocator(hostString) - if err != nil { - return nil, err - } - if ResourceType(res) == ResourceTypeContainer { - return &ContainerLocator{*host, id}, nil - } - return &GenericLocator{*host, id, ResourceType(res)}, nil -} - -func (r *HostLocator) IsDefaultPort() bool { - return r.Port == 0 -} -func (r *HostLocator) IsRemote() bool { - return r.Host != "" -} -func (r *HostLocator) Identity() string { - return r.HostIdentity() -} -func (r *HostLocator) HostIdentity() string { - if r.Host != "" { - if !r.IsDefaultPort() { - return net.JoinHostPort(r.Host, strconv.Itoa(int(r.Port))) - } - return r.Host - } - return LocalHostName -} -func (r *HostLocator) ResolvedHostname() string { - if r.Host != "" { - return r.Host - } - return "localhost" -} -func (h *HostLocator) NewURI() (*url.URL, error) { - port := "43273" - if !h.Port.Default() { - port = strconv.Itoa(int(h.Port)) - } - return &url.URL{ - Scheme: "http", - Host: net.JoinHostPort(h.ResolvedHostname(), port), - }, nil -} -func (r *HostLocator) BaseURL() *url.URL { - uri, err := r.NewURI() - if err != nil { - log.Fatal(err) - } - return uri -} -func (r *HostLocator) ResourceType() ResourceType { - return "" -} - -func (r *ContainerLocator) ResourceType() ResourceType { - return ResourceTypeContainer -} -func (r *ContainerLocator) Identity() string { - return r.HostLocator.HostIdentity() + "/" + string(r.Id) -} -func (r *ContainerLocator) Identifier() containers.Identifier { - return r.Id -} - -func (r *GenericLocator) ResourceType() ResourceType { - return r.Type -} -func (r *GenericLocator) Identity() string { - return string(r.Type) + "://" + r.HostLocator.HostIdentity() + "/" + string(r.Id) -} -func (r *GenericLocator) Identifier() containers.Identifier { - return r.Id -} diff --git a/contrib/geard-image.service b/contrib/geard-image.service new file mode 100644 index 00000000..5004a5dc --- /dev/null +++ b/contrib/geard-image.service @@ -0,0 +1,16 @@ +[Unit] +Description=Gear Provisioning Daemon (geard) +Documentation=https://github.com/openshift/geard + +[Service] +Type=simple +ExecStart=/usr/bin/docker run \ + -v /var/run/dbus/system_bus_socket:/var/run/dbus/system_bus_socket \ + -v /var/lib/containers:/var/lib/containers \ + -v /etc/systemd/system/container-active.target.wants:/etc/systemd/system/container-active.target.wants \ + -p 43273:43273 \ + -a stderr -a stdout \ + openshift/geard:latest + +[Install] +WantedBy=multi-user.target diff --git a/deployment/deployment.go b/deployment/deployment.go index c92a19e1..07e83c2d 100644 --- a/deployment/deployment.go +++ b/deployment/deployment.go @@ -5,8 +5,10 @@ package deployment import ( "encoding/json" "errors" + "fmt" "github.com/openshift/geard/containers" "github.com/openshift/geard/port" + "github.com/openshift/geard/transport" "os" "strconv" ) @@ -36,27 +38,37 @@ func NewDeploymentFromFile(path string) (*Deployment, error) { return deployment, nil } -func (d Deployment) Describe(placement PlacementStrategy) (next *Deployment, removed InstanceRefs, err error) { +func (d Deployment) Describe(placement PlacementStrategy, t transport.Transport) (next *Deployment, removed InstanceRefs, err error) { // copy the container list and clear any intermediate state sources := d.Containers.Copy() // assign instances to containers or the remove list - for _, instance := range d.Instances { + for i := range d.Instances { + instance := &d.Instances[i] + copied := *instance // is the instance invalid or no longer part of the cluster if instance.On == nil { continue } - if placement.RemoveFromLocation(instance.On) { - removed = append(removed, &instance) + if instance.on == nil { + locator, errl := t.LocatorFor(*instance.On) + if errl != nil { + err = errors.New(fmt.Sprintf("The host %s for instance %s is not recognized - you may be using a different transport than originally specified: %s", *instance.On, instance.Id, errl.Error())) + return + } + instance.on = locator + } + if placement.RemoveFromLocation(instance.on) { + removed = append(removed, &copied) continue } // locate the container c, found := sources.Find(instance.From) if !found { - removed = append(removed, &instance) + removed = append(removed, &copied) continue } - c.AddInstance(&instance) + c.AddInstance(&copied) } // create new instances for each container @@ -191,23 +203,6 @@ func (d *Deployment) UpdateLinks() { } } -// Return a set of container locators from the specified deployment -// descriptor. -func ExtractContainerLocatorsFromDeployment(path string, args *[]string) error { - if path == "" { - return nil - } - deployment, err := NewDeploymentFromFile(path) - if err != nil { - return err - } - ids := deployment.Instances.Ids() - for i := range ids { - *args = append(*args, ids[i].Identity()) - } - return nil -} - // A container description type Container struct { Name string diff --git a/deployment/deployment_test.go b/deployment/deployment_test.go index 05634614..d3a1ca6d 100644 --- a/deployment/deployment_test.go +++ b/deployment/deployment_test.go @@ -2,17 +2,30 @@ package deployment import ( "encoding/json" - "github.com/openshift/geard/cmd" + "github.com/openshift/geard/http" "github.com/openshift/geard/port" + "github.com/openshift/geard/transport" "io/ioutil" + "log" "regexp" "strings" "testing" ) -var localhost = cmd.HostLocator{"127.0.0.1", 0} -var noHosts PlacementStrategy = SimplePlacement(cmd.Locators{}) -var oneHost PlacementStrategy = SimplePlacement(cmd.Locators{&localhost}) +var loopbackTransport = http.NewHttpTransport() +var localTransport = transport.Local +var localhost transport.Locator +var noHosts PlacementStrategy = SimplePlacement(transport.Locators{}) +var oneHost PlacementStrategy + +func init() { + host, err := transport.NewHostLocator("127.0.0.1") + if err != nil { + log.Fatal(err) + } + localhost = host + oneHost = SimplePlacement(transport.Locators{host}) +} func loadDeployment(path string) *Deployment { body, err := ioutil.ReadFile(path) @@ -63,10 +76,10 @@ func TestPrepareDeployment(t *testing.T) { } ] }`) - if _, _, err := dep.Describe(noHosts); err != nil { + if _, _, err := dep.Describe(noHosts, loopbackTransport); err != nil { t.Fatal("Should not return error when describing with no hosts") } - next, removed, err := dep.Describe(oneHost) + next, removed, err := dep.Describe(oneHost, loopbackTransport) if err != nil { t.Fatal("Error when describing one host", err) } @@ -96,7 +109,7 @@ func TestPrepareDeploymentExternal(t *testing.T) { } ] }`) - next, removed, err := dep.Describe(oneHost) + next, removed, err := dep.Describe(oneHost, loopbackTransport) if err != nil { t.Fatal("Error when describing one host", err) } @@ -127,7 +140,7 @@ func TestPrepareDeploymentRemoveMissing(t *testing.T) { } ] }`) - next, removed, err := dep.Describe(oneHost) + next, removed, err := dep.Describe(oneHost, loopbackTransport) if err != nil { t.Fatal("Error when describing one host", err) } @@ -138,8 +151,10 @@ func TestPrepareDeploymentRemoveMissing(t *testing.T) { t.Fatal("Instances without hosts should be ignored", removed) } - dep.Instances[0].On = &localhost - next, removed, err = dep.Describe(oneHost) + log.Printf("Localhost %+v", localhost) + s := localhost.String() + dep.Instances[0].On = &s + next, removed, err = dep.Describe(oneHost, loopbackTransport) if err != nil { t.Fatal("Error when describing one host", err) } @@ -172,12 +187,12 @@ func TestPrepareDeploymentError(t *testing.T) { } ] }`) - if _, _, err := dep.Describe(oneHost); err != nil { + if _, _, err := dep.Describe(oneHost, loopbackTransport); err != nil { t.Fatal("Should not have received an error", err.Error()) } dep.Containers[0].Links[0].Ports = []port.Port{port.Port(8081)} - if _, _, err := dep.Describe(oneHost); err == nil { + if _, _, err := dep.Describe(oneHost, loopbackTransport); err == nil { t.Fatal("Should have received an error") } else { if !regexp.MustCompile("target port 8081 on web is not found").MatchString(err.Error()) { @@ -188,7 +203,7 @@ func TestPrepareDeploymentError(t *testing.T) { link := &dep.Containers[0].Links[0] link.Ports = []port.Port{} link.To = "db" - if _, _, err := dep.Describe(oneHost); err == nil { + if _, _, err := dep.Describe(oneHost, loopbackTransport); err == nil { t.Fatal("Should have received an error") } else { if !regexp.MustCompile("target db has no public ports to link to from web").MatchString(err.Error()) { @@ -197,7 +212,7 @@ func TestPrepareDeploymentError(t *testing.T) { } dep.Containers[1].PublicPorts = port.PortPairs{port.PortPair{port.Port(27017), 0}} - next, removed, err := dep.Describe(oneHost) + next, removed, err := dep.Describe(oneHost, loopbackTransport) if err != nil { t.Fatal("Should not have received an error", err.Error()) } @@ -216,7 +231,7 @@ func TestPrepareDeploymentError(t *testing.T) { dep.Containers[0].Links = append(dep.Containers[0].Links, Link{ To: "web", }) - next, removed, err = dep.Describe(oneHost) + next, removed, err = dep.Describe(oneHost, loopbackTransport) if err != nil { t.Fatal("Should not have received an error", err.Error()) } @@ -239,7 +254,7 @@ func TestPrepareDeploymentError(t *testing.T) { func TestPrepareDeploymentInterlink(t *testing.T) { dep := loadDeployment("./fixtures/complex_deploy.json") - changes, _, err := dep.Describe(oneHost) + changes, _, err := dep.Describe(oneHost, loopbackTransport) if err != nil { t.Fatal("Should not have received an error", err) } @@ -266,7 +281,7 @@ func TestPrepareDeploymentInterlink(t *testing.T) { func TestPrepareDeploymentMongo(t *testing.T) { dep := loadDeployment("./fixtures/mongo_deploy.json") - changes, _, err := dep.Describe(oneHost) + changes, _, err := dep.Describe(oneHost, loopbackTransport) if err != nil { t.Fatal("Should not have received an error", err) } @@ -300,7 +315,7 @@ func TestPrepareDeploymentMongo(t *testing.T) { func TestReloadDeploymentMongo(t *testing.T) { dep := loadDeployment("./fixtures/mongo_deploy_existing.json") - changes, _, err := dep.Describe(oneHost) + changes, _, err := dep.Describe(oneHost, loopbackTransport) if err != nil { t.Fatal("Should not have received an error", err) } @@ -331,17 +346,17 @@ func TestReloadDeploymentMongo(t *testing.T) { } } - changes, removed, err := dep.Describe(noHosts) + changes, removed, err := dep.Describe(noHosts, loopbackTransport) if err != nil { t.Fatal("Should not have received an error", err) } + b, _ := json.MarshalIndent(changes, "", " ") + t.Log(string(b)) + if len(changes.Instances) != 0 { t.Fatalf("Expected %d instances, got %d", 0, len(changes.Instances)) } if len(removed) != 3 { t.Fatalf("Expected to remove %d instances, got %d", 3, len(removed)) } - - // b, _ := json.MarshalIndent(changes, "", " ") - // t.Log(string(b)) } diff --git a/deployment/fixtures/mongo_deploy_existing.json b/deployment/fixtures/mongo_deploy_existing.json index 68ef3377..16b3f518 100644 --- a/deployment/fixtures/mongo_deploy_existing.json +++ b/deployment/fixtures/mongo_deploy_existing.json @@ -23,10 +23,7 @@ "Id": "db-1", "Image": "ccoleman/ubuntu-mongodb-repl", "From": "db", - "On": { - "Host": "localhost", - "Port": 0 - }, + "On": "localhost", "Ports": [ { "Internal": 27017, @@ -42,10 +39,7 @@ "Id": "db-2", "Image": "ccoleman/ubuntu-mongodb-repl", "From": "db", - "On": { - "Host": "localhost", - "Port": 0 - }, + "On": "localhost", "Ports": [ { "Internal": 27017, @@ -61,10 +55,7 @@ "Id": "db-3", "Image": "ccoleman/ubuntu-mongodb-repl", "From": "db", - "On": { - "Host": "localhost", - "Port": 0 - }, + "On": "localhost", "Ports": [ { "Internal": 27017, diff --git a/deployment/instance.go b/deployment/instance.go index 66b109f7..7a45e550 100644 --- a/deployment/instance.go +++ b/deployment/instance.go @@ -1,8 +1,10 @@ package deployment import ( - "github.com/openshift/geard/cmd" + "errors" + "fmt" "github.com/openshift/geard/containers" + "github.com/openshift/geard/transport" ) // A container that has been created on a server @@ -13,8 +15,8 @@ type Instance struct { Image string // The container definition this is from From string - // The host system this is or should be deployed on - On *cmd.HostLocator `json:"On,omitempty"` + // The deployed location - nil for not deployed + On *string `json:"On,omitempty"` // The mapping of internal, external, and remote ports Ports PortMappings `json:"Ports,omitempty"` @@ -25,8 +27,12 @@ type Instance struct { // The container this instance is associated with container *Container + // The resolved locator of an instance + on transport.Locator // The generated links for this instance links InstanceLinks + // A cached hostname for this instance + hostname string } type Instances []Instance type InstanceRefs []*Instance @@ -43,8 +49,16 @@ func (i *Instance) MarkRemoved() { i.remove = true } -func (i *Instance) ResolvedHostname() string { - return i.On.ResolvedHostname() +func (i *Instance) Place(on transport.Locator) { + i.on = on + s := on.String() + i.On = &s +} +func (i *Instance) ResolveHostname() (string, error) { + if i.on == nil { + return "", errors.New(fmt.Sprintf("No locator available for this instance (can't resolve from %s)", i.On)) + } + return i.on.ResolveHostname() } func (i *Instance) EnvironmentVariables() { @@ -59,6 +73,14 @@ func (instances Instances) Find(id containers.Identifier) (*Instance, bool) { return nil, false } +func (instances Instances) References() InstanceRefs { + refs := make(InstanceRefs, 0, 5) + for i := range instances { + refs = append(refs, &instances[i]) + } + return refs +} + func (instances Instances) ReferencesFor(name string) InstanceRefs { refs := make(InstanceRefs, 0, 5) for i := range instances { @@ -69,40 +91,22 @@ func (instances Instances) ReferencesFor(name string) InstanceRefs { return refs } -func (refs Instances) Ids() (ids []cmd.Locator) { - ids = make([]cmd.Locator, 0, len(refs)) - for i := range refs { - ids = append(ids, &cmd.ContainerLocator{*refs[i].On, refs[i].Id}) - } - return -} - -func (refs Instances) AddedIds() (ids []cmd.Locator) { - ids = make([]cmd.Locator, 0, len(refs)) +func (refs Instances) Added() InstanceRefs { + adds := make(InstanceRefs, 0, len(refs)) for i := range refs { if refs[i].add { - ids = append(ids, &cmd.ContainerLocator{*refs[i].On, refs[i].Id}) + adds = append(adds, &refs[i]) } } - return + return adds } -func (refs Instances) LinkedIds() (ids []cmd.Locator) { - ids = make([]cmd.Locator, 0, len(refs)) +func (refs Instances) Linked() InstanceRefs { + linked := make(InstanceRefs, 0, len(refs)) for i := range refs { if len(refs[i].links) > 0 { - ids = append(ids, &cmd.ContainerLocator{*refs[i].On, refs[i].Id}) - } - } - return -} - -func (refs InstanceRefs) Ids() (ids []cmd.Locator) { - ids = make([]cmd.Locator, 0, len(refs)) - for i := range refs { - if refs[i] != nil { - ids = append(ids, &cmd.ContainerLocator{*refs[i].On, refs[i].Id}) + linked = append(linked, &refs[i]) } } - return + return linked } diff --git a/deployment/links.go b/deployment/links.go index 1d431388..ad55813e 100644 --- a/deployment/links.go +++ b/deployment/links.go @@ -43,10 +43,6 @@ func (links InstanceLinks) NetworkLinks() (dup containers.NetworkLinks) { return } -type hostnameResolver interface { - ResolvedHostname() string -} - // Return the set of links that should be resolved func (sources Containers) OrderLinks() (ordered containerLinks, err error) { links := make(containerLinks, 0) @@ -189,7 +185,12 @@ func (link containerLink) appendLinks() error { if !found { return errors.New(fmt.Sprintf("deployment: instance does not expose %d for link %s", port, link.String())) } - log.Printf("appending %d on %s: %+v %+v", port, instance.Id, mapping, instance) + //log.Printf("appending %d on %s: %+v %+v", port, instance.Id, mapping, instance) + + name, err := target.ResolveHostname() + if err != nil { + return err + } instance.links = append(instance.links, InstanceLink{ NetworkLink: containers.NetworkLink{ @@ -197,7 +198,7 @@ func (link containerLink) appendLinks() error { FromPort: mapping.Target.Port, ToPort: mapping.External, - ToHost: instance.ResolvedHostname(), + ToHost: name, }, from: link.Target.Name, fromPort: port, diff --git a/deployment/placement.go b/deployment/placement.go index cee56931..ff4fbe85 100644 --- a/deployment/placement.go +++ b/deployment/placement.go @@ -1,13 +1,13 @@ package deployment import ( - "github.com/openshift/geard/cmd" + "github.com/openshift/geard/transport" ) type PlacementStrategy interface { // Return true if the location of an existing container is no // longer valid. - RemoveFromLocation(cmd.Locator) bool + RemoveFromLocation(locator transport.Locator) bool // Allow the strategy to determine which location will host a // container by setting Instance.On for each container in added. // Failing to set an "On" for a container will return an error. @@ -19,19 +19,24 @@ type PlacementStrategy interface { Assign(added InstanceRefs, containers Containers) error } -type SimplePlacement cmd.Locators +type SimplePlacement transport.Locators -func (p SimplePlacement) RemoveFromLocation(on cmd.Locator) bool { - return !cmd.Locators(p).Has(on) +func (p SimplePlacement) RemoveFromLocation(on transport.Locator) bool { + for _, l := range transport.Locators(p) { + if l.String() == on.String() { + return false + } + } + return true } func (p SimplePlacement) Assign(added InstanceRefs, containers Containers) error { - locators := cmd.Locators(p) + locators := transport.Locators(p) pos := 0 for i := range added { instance := added[i] if len(locators) > 0 { - host, _ := cmd.NewHostLocator(locators[pos%len(locators)].HostIdentity()) - instance.On = host + locator := locators[pos%len(locators)] + instance.Place(locator) pos++ } else { instance.MarkRemoved() diff --git a/deployment/ports.go b/deployment/ports.go index 4c6d4d75..7f2d2933 100644 --- a/deployment/ports.go +++ b/deployment/ports.go @@ -78,6 +78,7 @@ func NewInstancePortTable(sources Containers) (*InstancePortTable, error) { // make existing reservations for i := range sources { instances := sources[i].Instances() + for j := range instances { instance := instances[j] for k := range instance.Ports { @@ -85,6 +86,7 @@ func NewInstancePortTable(sources Containers) (*InstancePortTable, error) { if target.Empty() { continue } + _, found := table.reserved[target] if found { return nil, errors.New(fmt.Sprintf("deployment: The port %s is assigned to multiple instances (last: %s)", target.String(), instance.Id)) diff --git a/git/cmd/cmd.go b/git/cmd/cmd.go index c3a3b556..24358169 100644 --- a/git/cmd/cmd.go +++ b/git/cmd/cmd.go @@ -4,11 +4,12 @@ import ( . "github.com/openshift/geard/cmd" "github.com/openshift/geard/containers" "github.com/openshift/geard/git" - "github.com/openshift/geard/git/http" gitjobs "github.com/openshift/geard/git/jobs" "github.com/openshift/geard/jobs" sshjobs "github.com/openshift/geard/ssh/jobs" "github.com/openshift/geard/systemd" + "github.com/openshift/geard/transport" + "github.com/spf13/cobra" "os" ) @@ -52,26 +53,33 @@ func repoCreate(c *cobra.Command, args []string) { Fail(1, "Valid arguments: []\n") } - id, err := NewGenericLocator(git.ResourceTypeRepository, args[0]) + t := c.Flags().Lookup("transport").Value.(*transport.TransportFlag).Get() + + id, err := NewResourceLocator(t, git.ResourceTypeRepository, args[0]) if err != nil { Fail(1, "You must pass one valid repository name: %s\n", err.Error()) } - - if id.ResourceType() != git.ResourceTypeRepository { + if id.(*ResourceLocator).Type != git.ResourceTypeRepository { Fail(1, "You must pass one valid repository name: %s\n", err.Error()) } + cloneUrl := "" + if len(args) == 2 { + cloneUrl = args[1] + } + Executor{ On: Locators{id}, Serial: func(on Locator) jobs.Job { - var req http.HttpCreateRepositoryRequest - req = http.HttpCreateRepositoryRequest{} - req.Id = git.RepoIdentifier(on.(ResourceLocator).Identifier()) - - return &req + return &gitjobs.CreateRepositoryRequest{ + Id: git.RepoIdentifier(on.(*ResourceLocator).Id), + CloneUrl: cloneUrl, + RequestId: jobs.NewRequestIdentifier(), + } }, Output: os.Stdout, LocalInit: LocalInitializers(systemd.Start, containers.InitializeData), + Transport: t, }.StreamAndExit() } diff --git a/git/cmd/extend.go b/git/cmd/extend.go index 6947390e..2373b9e4 100644 --- a/git/cmd/extend.go +++ b/git/cmd/extend.go @@ -13,7 +13,7 @@ import ( func init() { cmd.AddInitializer(git.InitializeData, cmd.ForDaemon) - http.AddHttpExtension(githttp.Routes) + http.AddHttpExtension(&githttp.HttpExtension{}) cmd.AddCommandExtension(registerRemote, false) cmd.AddCommandExtension(registerLocal, true) diff --git a/git/http/handlers.go b/git/http/handlers.go index 2ea0fbf1..0d2a7280 100644 --- a/git/http/handlers.go +++ b/git/http/handlers.go @@ -10,7 +10,9 @@ import ( "github.com/openshift/go-json-rest" ) -func Routes() []http.HttpJobHandler { +type HttpExtension struct{} + +func (h *HttpExtension) Routes() []http.HttpJobHandler { return []http.HttpJobHandler{ &HttpCreateRepositoryRequest{}, &httpGitArchiveContentRequest{ @@ -19,6 +21,16 @@ func Routes() []http.HttpJobHandler { } } +func (h *HttpExtension) HttpJobFor(job jobs.Job) (exc http.RemoteExecutable, err error) { + switch j := job.(type) { + case *gitjobs.CreateRepositoryRequest: + exc = &HttpCreateRepositoryRequest{CreateRepositoryRequest: *j} + case *gitjobs.GitArchiveContentRequest: + exc = &httpGitArchiveContentRequest{GitArchiveContentRequest: *j} + } + return +} + type HttpCreateRepositoryRequest struct { gitjobs.CreateRepositoryRequest http.DefaultRequest diff --git a/http/extend.go b/http/extend.go new file mode 100644 index 00000000..0efbfbe3 --- /dev/null +++ b/http/extend.go @@ -0,0 +1,9 @@ +package http + +import ( + "github.com/openshift/geard/transport" +) + +func init() { + transport.RegisterTransport("http", NewHttpTransport()) +} diff --git a/http/extension.go b/http/extension.go index de3a5b66..b208da6f 100644 --- a/http/extension.go +++ b/http/extension.go @@ -1,9 +1,16 @@ package http +import ( + "github.com/openshift/geard/jobs" +) + // All registered extensions var extensions []HttpExtension -type HttpExtension func() []HttpJobHandler +type HttpExtension interface { + Routes() []HttpJobHandler + HttpJobFor(job jobs.Job) (RemoteExecutable, error) +} // Register an extension to this server during init() or startup func AddHttpExtension(extension HttpExtension) { diff --git a/http/handlers.go b/http/handlers.go index 71ef9249..1c64efef 100644 --- a/http/handlers.go +++ b/http/handlers.go @@ -70,7 +70,6 @@ func (h *HttpInstallContainerRequest) Handler(conf *HttpConfiguration) JobHandle type HttpDeleteContainerRequest struct { jobs.DeleteContainerRequest DefaultRequest - Label string } func (h *HttpDeleteContainerRequest) HttpMethod() string { return "DELETE" } @@ -81,17 +80,13 @@ func (h *HttpDeleteContainerRequest) Handler(conf *HttpConfiguration) JobHandler if errg != nil { return nil, errg } - return &jobs.DeleteContainerRequest{id}, nil + return &jobs.DeleteContainerRequest{Id: id}, nil } } -func (h *HttpDeleteContainerRequest) JobLabel() string { - return h.Label -} type HttpListContainersRequest struct { jobs.ListContainersRequest DefaultRequest - Label string } func (h *HttpListContainersRequest) HttpMethod() string { return "GET" } @@ -101,9 +96,6 @@ func (h *HttpListContainersRequest) Handler(conf *HttpConfiguration) JobHandler return &jobs.ListContainersRequest{}, nil } } -func (h *HttpListContainersRequest) JobLabel() string { - return h.Label -} type HttpListBuildsRequest jobs.ListBuildsRequest @@ -362,7 +354,6 @@ func (h *HttpContentRequest) Handler(conf *HttpConfiguration) JobHandler { } type HttpLinkContainersRequest struct { - Label string jobs.LinkContainersRequest DefaultRequest } @@ -383,12 +374,9 @@ func (h *HttpLinkContainersRequest) Handler(conf *HttpConfiguration) JobHandler return nil, err } - return &jobs.LinkContainersRequest{data}, nil + return &jobs.LinkContainersRequest{ContainerLinks: data}, nil } } -func (h *HttpLinkContainersRequest) JobLabel() string { - return h.Label -} var reSplat = regexp.MustCompile("\\:[a-z\\*]+") diff --git a/http/job_response.go b/http/job_response.go index cb8a853c..ba73e821 100644 --- a/http/job_response.go +++ b/http/job_response.go @@ -110,15 +110,6 @@ func (s *httpJobResponse) statusCode(t jobs.JobResponseSuccess, stream, data boo } } -func (s *httpJobResponse) WriteClosed() <-chan bool { - if c, ok := s.response.(http.CloseNotifier); ok { - return c.CloseNotify() - } - ch := make(chan bool) - close(ch) - return ch -} - func (s *httpJobResponse) WritePendingSuccess(name string, value interface{}) { if s.pending == nil { s.pending = make(map[string]string) @@ -130,7 +121,7 @@ func (s *httpJobResponse) WritePendingSuccess(name string, value interface{}) { } } -func (s *httpJobResponse) Failure(e jobs.JobError) { +func (s *httpJobResponse) Failure(err error) { if s.succeeded { panic("May not invoke failure after Success()") } @@ -139,24 +130,27 @@ func (s *httpJobResponse) Failure(e jobs.JobError) { } s.failed = true - response := httpFailureResponse{e.Error(), e.ResponseData()} - var code int - switch e.ResponseFailure() { - case jobs.JobResponseAlreadyExists: - code = http.StatusConflict - case jobs.JobResponseNotFound: - code = http.StatusNotFound - case jobs.JobResponseInvalidRequest: - code = http.StatusBadRequest - case jobs.JobResponseNotAcceptable: - code = http.StatusNotAcceptable - case jobs.JobResponseRateLimit: - code = 429 // http.statusTooManyRequests - default: - code = http.StatusInternalServerError + code := http.StatusInternalServerError + response := httpFailureResponse{err.Error(), nil} + s.response.Header().Set("Content-Type", "application/json") + + if e, ok := err.(jobs.JobError); ok { + response.Data = e.ResponseData() + + switch e.ResponseFailure() { + case jobs.JobResponseAlreadyExists: + code = http.StatusConflict + case jobs.JobResponseNotFound: + code = http.StatusNotFound + case jobs.JobResponseInvalidRequest: + code = http.StatusBadRequest + case jobs.JobResponseNotAcceptable: + code = http.StatusNotAcceptable + case jobs.JobResponseRateLimit: + code = 429 // http.statusTooManyRequests + } } - s.response.Header().Set("Content-Type", "application/json") s.response.WriteHeader(code) json.NewEncoder(s.response).Encode(&response) } diff --git a/http/remote.go b/http/remote.go index 0b00b251..f98dcc3d 100644 --- a/http/remote.go +++ b/http/remote.go @@ -4,21 +4,22 @@ import ( "encoding/json" "errors" "fmt" - "github.com/openshift/geard/jobs" "io" "log" + "net" "net/http" "net/url" "os" + "strings" + + "github.com/openshift/geard/jobs" + "github.com/openshift/geard/transport" ) -type Locator interface { - IsRemote() bool - Identity() string -} +const DefaultHttpPort = "43273" type RemoteLocator interface { - BaseURL() *url.URL + ToURL() *url.URL } type RemoteJob interface { @@ -33,26 +34,103 @@ type RemoteExecutable interface { UnmarshalHttpResponse(headers http.Header, r io.Reader, mode ResponseContentMode) (interface{}, error) } -type HttpDispatcher struct { - client *http.Client - locator RemoteLocator - log *log.Logger +type HttpTransport struct { + client *http.Client } -func NewHttpDispatcher(l RemoteLocator, logger *log.Logger) *HttpDispatcher { - if logger == nil { - logger = log.New(os.Stdout, "", 0) +func NewHttpTransport() *HttpTransport { + return &HttpTransport{&http.Client{}} +} + +func (h *HttpTransport) LocatorFor(value string) (transport.Locator, error) { + return transport.NewHostLocator(value) +} + +func (h *HttpTransport) RemoteJobFor(locator transport.Locator, j jobs.Job) (job jobs.Job, err error) { + if locator == transport.Local { + job = j + return + } + baseUrl, errl := urlForLocator(locator) + if errl != nil { + err = errors.New("The provided host is not valid '" + locator.String() + "': " + errl.Error()) + return + } + httpJob, errh := HttpJobFor(j) + if errh != nil { + err = errh + return + } + + job = jobs.JobFunction(func(res jobs.JobResponse) { + if err := h.ExecuteRemote(baseUrl, httpJob, res); err != nil { + res.Failure(err) + } + }) + return +} + +func urlForLocator(locator transport.Locator) (*url.URL, error) { + base := locator.String() + if strings.Contains(base, ":") { + host, port, err := net.SplitHostPort(base) + if err != nil { + return nil, err + } + if port == "" { + base = net.JoinHostPort(host, DefaultHttpPort) + } + } else { + base = net.JoinHostPort(base, DefaultHttpPort) + } + return &url.URL{Scheme: "http", Host: base}, nil +} + +func HttpJobFor(job jobs.Job) (exc RemoteExecutable, err error) { + switch j := job.(type) { + case *jobs.InstallContainerRequest: + exc = &HttpInstallContainerRequest{InstallContainerRequest: *j} + case *jobs.StartedContainerStateRequest: + exc = &HttpStartContainerRequest{StartedContainerStateRequest: *j} + case *jobs.StoppedContainerStateRequest: + exc = &HttpStopContainerRequest{StoppedContainerStateRequest: *j} + case *jobs.RestartContainerRequest: + exc = &HttpRestartContainerRequest{RestartContainerRequest: *j} + case *jobs.PutEnvironmentRequest: + exc = &HttpPutEnvironmentRequest{PutEnvironmentRequest: *j} + case *jobs.PatchEnvironmentRequest: + exc = &HttpPatchEnvironmentRequest{PatchEnvironmentRequest: *j} + case *jobs.ContainerStatusRequest: + exc = &HttpContainerStatusRequest{ContainerStatusRequest: *j} + case *jobs.ContentRequest: + exc = &HttpContentRequest{ContentRequest: *j} + case *jobs.DeleteContainerRequest: + exc = &HttpDeleteContainerRequest{DeleteContainerRequest: *j} + case *jobs.LinkContainersRequest: + exc = &HttpLinkContainersRequest{LinkContainersRequest: *j} + case *jobs.ListContainersRequest: + exc = &HttpListContainersRequest{ListContainersRequest: *j} + default: + for _, ext := range extensions { + req, errr := ext.HttpJobFor(job) + if errr != nil { + return nil, errr + } + if req != nil { + exc = req + break + } + } } - return &HttpDispatcher{ - client: &http.Client{}, - locator: l, - log: logger, + if exc == nil { + err = errors.New("The provided job cannot be sent remotely") } + return } -func (h *HttpDispatcher) Dispatch(job RemoteExecutable, res jobs.JobResponse) error { +func (h *HttpTransport) ExecuteRemote(baseUrl *url.URL, job RemoteExecutable, res jobs.JobResponse) error { reader, writer := io.Pipe() - httpreq, errn := http.NewRequest(job.HttpMethod(), h.locator.BaseURL().String(), reader) + httpreq, errn := http.NewRequest(job.HttpMethod(), baseUrl.String(), reader) if errn != nil { return errn } @@ -75,7 +153,7 @@ func (h *HttpDispatcher) Dispatch(job RemoteExecutable, res jobs.JobResponse) er req.URL.RawQuery = query.Encode() go func() { if err := job.MarshalHttpRequestBody(writer); err != nil { - h.log.Printf("remote: Error when writing to http: %v", err) + log.Printf("http_remote: Error when writing to http: %v", err) writer.CloseWithError(err) } else { writer.Close() diff --git a/http/server.go b/http/server.go index ff84a455..7ed602e4 100644 --- a/http/server.go +++ b/http/server.go @@ -73,8 +73,8 @@ func (conf *HttpConfiguration) Handler() http.Handler { &HttpContentRequest{ContentRequest: jobs.ContentRequest{Type: jobs.ContentTypeEnvironment}}, } - for i := range extensions { - routes := extensions[i]() + for _, ext := range extensions { + routes := ext.Routes() for j := range routes { handlers = append(handlers, routes[j]) } diff --git a/jobs/delete_container.go b/jobs/delete_container.go index 5fe6b706..51bb557f 100644 --- a/jobs/delete_container.go +++ b/jobs/delete_container.go @@ -10,7 +10,12 @@ import ( ) type DeleteContainerRequest struct { - Id containers.Identifier + Id containers.Identifier + Label string +} + +func (j *DeleteContainerRequest) JobLabel() string { + return j.Label } func (j *DeleteContainerRequest) Execute(resp JobResponse) { diff --git a/jobs/jobs.go b/jobs/jobs.go index e10dce9b..ae8aeabb 100644 --- a/jobs/jobs.go +++ b/jobs/jobs.go @@ -48,9 +48,8 @@ type JobResponse interface { Success(t JobResponseSuccess) SuccessWithData(t JobResponseSuccess, data interface{}) SuccessWithWrite(t JobResponseSuccess, flush, structured bool) io.Writer - Failure(reason JobError) + Failure(reason error) - WriteClosed() <-chan bool WritePendingSuccess(name string, value interface{}) } diff --git a/jobs/link_containers.go b/jobs/link_containers.go index c6e071d7..a5fe9422 100644 --- a/jobs/link_containers.go +++ b/jobs/link_containers.go @@ -43,6 +43,11 @@ func (link *ContainerLinks) Check() error { type LinkContainersRequest struct { *ContainerLinks + Label string +} + +func (j *LinkContainersRequest) JobLabel() string { + return j.Label } func (j *LinkContainersRequest) Execute(resp JobResponse) { diff --git a/jobs/list_units.go b/jobs/list_units.go index 0c7b22bf..9a4fcae6 100644 --- a/jobs/list_units.go +++ b/jobs/list_units.go @@ -35,17 +35,24 @@ func unitsMatching(re *regexp.Regexp, found func(name string, unit *dbus.UnitSta return err } - for _, unit := range all { + for i := range all { + unit := &all[i] if matched := re.MatchString(unit.Name); matched { name := re.FindStringSubmatch(unit.Name)[1] - found(name, &unit) + found(name, unit) } } return nil } type ListContainersRequest struct { + Label string } + +func (l *ListContainersRequest) JobLabel() string { + return l.Label +} + type ContainerUnitResponse struct { unitResponse LoadState string diff --git a/ssh/cmd/cmd.go b/ssh/cmd/cmd.go index 729b59a0..1c134368 100644 --- a/ssh/cmd/cmd.go +++ b/ssh/cmd/cmd.go @@ -4,18 +4,19 @@ import ( "bufio" "bytes" "errors" + "github.com/spf13/cobra" + "io/ioutil" + "os" + "os/user" + "path/filepath" + . "github.com/openshift/geard/cmd" "github.com/openshift/geard/containers" "github.com/openshift/geard/jobs" sshkey "github.com/openshift/geard/pkg/ssh-public-key" "github.com/openshift/geard/ssh" - . "github.com/openshift/geard/ssh/http" . "github.com/openshift/geard/ssh/jobs" - "github.com/spf13/cobra" - "io/ioutil" - "os" - "os/user" - "path/filepath" + "github.com/openshift/geard/transport" ) var ( @@ -78,8 +79,11 @@ func addSshKeys(cmd *cobra.Command, args []string) { if len(args) < 1 { Fail(1, "Valid arguments: ...") } + + t := cmd.Flags().Lookup("transport").Value.(*transport.TransportFlag).Transport + // args... are locators for repositories or containers - ids, err := NewGenericLocators(ResourceTypeContainer, args...) + ids, err := NewResourceLocators(t, ResourceTypeContainer, args...) if err != nil { Fail(1, "You must pass 1 or more valid names: %s", err.Error()) } @@ -91,14 +95,15 @@ func addSshKeys(cmd *cobra.Command, args []string) { allPerms := make(map[string]*KeyPermission) for i := range ids { + resourceType := ids[i].(*ResourceLocator).Type if permissionHandlers == nil { - Fail(1, "The type '%s' is not supported by this command", ids[i].ResourceType()) + Fail(1, "The type '%s' is not supported by this command", resourceType) } - h, ok := permissionHandlers[ids[i].ResourceType()] + h, ok := permissionHandlers[resourceType] if !ok { - Fail(1, "The type '%s' is not supported by this command", ids[i].ResourceType()) + Fail(1, "The type '%s' is not supported by this command", resourceType) } - perm, err := h.CreatePermission(cmd, string(ids[i].(ResourceLocator).Identifier())) + perm, err := h.CreatePermission(cmd, ids[i].(*ResourceLocator).Id) if err != nil { Fail(1, err.Error()) } @@ -113,18 +118,17 @@ func addSshKeys(cmd *cobra.Command, args []string) { permissions = append(permissions, *allPerms[on[i].Identity()]) } - return &HttpCreateKeysRequest{ - CreateKeysRequest: CreateKeysRequest{ - &ExtendedCreateKeysData{ - Keys: keys, - Permissions: permissions, - }, + return &CreateKeysRequest{ + &ExtendedCreateKeysData{ + Keys: keys, + Permissions: permissions, }, } }, Output: os.Stdout, //TODO: display partial error info LocalInit: containers.InitializeData, + Transport: t, }.StreamAndExit() } diff --git a/ssh/cmd/extend.go b/ssh/cmd/extend.go index 4ecf88fc..69308405 100644 --- a/ssh/cmd/extend.go +++ b/ssh/cmd/extend.go @@ -9,8 +9,7 @@ import ( ) func init() { - http.AddHttpExtension(sshhttp.Routes) - + http.AddHttpExtension(&sshhttp.HttpExtension{}) cmd.AddCommandExtension(registerLocal, true) cmd.AddCommandExtension(registerRemote, false) } diff --git a/ssh/http/handlers.go b/ssh/http/handlers.go index bdca469a..9b2f1013 100644 --- a/ssh/http/handlers.go +++ b/ssh/http/handlers.go @@ -5,16 +5,27 @@ import ( "github.com/openshift/geard/http" "github.com/openshift/geard/jobs" sshjobs "github.com/openshift/geard/ssh/jobs" + "github.com/openshift/go-json-rest" "io" ) -func Routes() []http.HttpJobHandler { +type HttpExtension struct{} + +func (h *HttpExtension) Routes() []http.HttpJobHandler { return []http.HttpJobHandler{ &HttpCreateKeysRequest{}, } } +func (h *HttpExtension) HttpJobFor(job jobs.Job) (exc http.RemoteExecutable, err error) { + switch j := job.(type) { + case *sshjobs.CreateKeysRequest: + exc = &HttpCreateKeysRequest{CreateKeysRequest: *j} + } + return +} + type HttpCreateKeysRequest struct { sshjobs.CreateKeysRequest http.DefaultRequest diff --git a/tests/integration_test.go b/tests/integration_test.go index 4fbdcae6..ca6b5868 100644 --- a/tests/integration_test.go +++ b/tests/integration_test.go @@ -410,7 +410,7 @@ func (s *IntegrationTestSuite) TestStartStopContainer(c *chk.C) { hostContainerId := fmt.Sprintf("%v/%v", s.daemonURI, id) - cmd := exec.Command("/usr/bin/gear", "install", TestImage, hostContainerId, "--ports=8080:4001", "--isolate") + cmd := exec.Command("/usr/bin/gear", "install", TestImage, hostContainerId, "--ports=8080:34957", "--isolate") data, err := cmd.CombinedOutput() c.Log(string(data)) c.Assert(err, chk.IsNil) @@ -454,7 +454,7 @@ func (s *IntegrationTestSuite) TestRestartContainer(c *chk.C) { hostContainerId := fmt.Sprintf("%v/%v", s.daemonURI, id) - cmd := exec.Command("/usr/bin/gear", "install", TestImage, hostContainerId, "--ports=8080:4002", "--start", "--isolate") + cmd := exec.Command("/usr/bin/gear", "install", TestImage, hostContainerId, "--ports=8080:0", "--start", "--isolate") data, err := cmd.CombinedOutput() c.Log(string(data)) c.Assert(err, chk.IsNil) @@ -529,7 +529,7 @@ func (s *IntegrationTestSuite) TestLongContainerName(c *chk.C) { hostContainerId := fmt.Sprintf("%v/%v", s.daemonURI, id) - cmd := exec.Command("/usr/bin/gear", "install", TestImage, hostContainerId, "--start", "--ports=8080:4003", "--isolate") + cmd := exec.Command("/usr/bin/gear", "install", TestImage, hostContainerId, "--start", "--ports=8080:0", "--isolate") data, err := cmd.CombinedOutput() c.Log(string(data)) c.Assert(err, chk.IsNil) diff --git a/transport/locator.go b/transport/locator.go new file mode 100644 index 00000000..156b18f6 --- /dev/null +++ b/transport/locator.go @@ -0,0 +1,95 @@ +package transport + +import ( + "errors" + "github.com/openshift/geard/port" + "net" + "strconv" + "strings" +) + +// The reserved identifier for the local transport +const localTransport = "local" + +// The destination of a transport. All transports +// must provide a way to resolve the IP remote hostname. +type Locator interface { + // A string that uniquely identifies this destination + String() string + // Return a valid hostname for this locator + ResolveHostname() (string, error) +} +type Locators []Locator + +// The local transport - test against this variable for +// finding a local interface. +var Local = HostLocator(localTransport) + +// A host port combination representing a remote server - most IP +// transports can use this default implementation. +type HostLocator string + +func (t HostLocator) String() string { + return string(t) +} +func (t HostLocator) IsRemote() bool { + return localTransport != t.String() && "" != t.String() +} +func (t HostLocator) ResolveHostname() (string, error) { + return ResolveLocatorHostname(t.String()) +} + +// Return an object representing an IP host +func NewHostLocator(value string) (HostLocator, error) { + if strings.Contains(value, "/") { + return "", errors.New("Host identifiers may not have a slash") + } + if value == "" || value == localTransport { + return Local, nil + } + + if strings.Contains(value, ":") { + _, portString, err := net.SplitHostPort(value) + if err != nil { + return "", err + } + if portString != "" { + p, err := strconv.Atoi(portString) + if err != nil { + return "", err + } + port := port.Port(p) + if err := port.Check(); err != nil { + return "", err + } + } + } + return HostLocator(value), nil +} + +func ResolveLocatorHostname(value string) (string, error) { + if value != "" && value != Local.String() { + if strings.Contains(value, ":") { + host, _, err := net.SplitHostPort(value) + if err != nil { + return "", err + } + return host, nil + } + return value, nil + } + return "localhost", nil +} + +// Convenience method for converting a set of strings into a list of Locators +func NewTransportLocators(transport Transport, values ...string) (Locators, error) { + out := make(Locators, 0, len(values)) + for i := range values { + r, err := transport.LocatorFor(values[i]) + if err != nil { + return out, err + } + out = append(out, r) + } + return out, nil +} diff --git a/transport/transport.go b/transport/transport.go new file mode 100644 index 00000000..39aeb9ca --- /dev/null +++ b/transport/transport.go @@ -0,0 +1,77 @@ +package transport + +import ( + "errors" + "fmt" + "github.com/openshift/geard/jobs" + "log" +) + +var ErrNotTransportable = errors.New("The specified job cannot be executed remotely") + +// Allow Jobs to be remotely executed. +type Transport interface { + // Return a locator from the given string + LocatorFor(string) (Locator, error) + // Given a locator, return a job that can be executed + // remotely. + RemoteJobFor(Locator, jobs.Job) (jobs.Job, error) +} + +type noTransport struct{} + +func (t *noTransport) LocatorFor(value string) (Locator, error) { + return NewHostLocator(value) +} +func (t *noTransport) RemoteJobFor(locator Locator, job jobs.Job) (jobs.Job, error) { + return job, nil +} + +var emptyTransport = &noTransport{} +var transports = make(map[string]Transport) + +func RegisterTransport(name string, t Transport) { + if t == nil { + log.Printf("Transport for '%s' must not be nil", name) + return + } + transports[name] = t +} + +func GetTransport(name string) (Transport, bool) { + t, ok := transports[name] + return t, ok +} + +func GetTransportNames() []string { + names := make([]string, 0, len(transports)) + for name, _ := range transports { + names = append(names, name) + } + return names +} + +// Implement the flag.Value interface for reading a transport +// from a string. +type TransportFlag struct { + Transport + name string +} + +func (t *TransportFlag) Get() Transport { + return t.Transport +} + +func (t *TransportFlag) String() string { + return t.name +} + +func (t *TransportFlag) Set(s string) error { + value, ok := GetTransport(s) + if !ok { + return errors.New(fmt.Sprintf("No transport defined for '%s'. Valid transports are %v", s, GetTransportNames())) + } + t.name = s + t.Transport = value + return nil +}