Skip to content
This repository has been archived by the owner on Aug 29, 2018. It is now read-only.

Commit

Permalink
Make the client use a "transport" abstraction to talk to server
Browse files Browse the repository at this point in the history
Decouple client -> server from knowing about the particular details
of how data flows back and forth from the server.  Use a consistent
Locator interface for referring to hosts that is generic across
possible transports.
  • Loading branch information
smarterclayton committed May 1, 2014
1 parent d15f27f commit 617b887
Show file tree
Hide file tree
Showing 36 changed files with 1,062 additions and 674 deletions.
46 changes: 46 additions & 0 deletions cmd/deployment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package cmd

import (
"errors"
"fmt"
"github.com/openshift/geard/deployment"
"github.com/openshift/geard/transport"
)

// Return a set of container locators from the specified deployment
// descriptor.
func ExtractContainerLocatorsFromDeployment(t transport.Transport, path string, args *[]string) error {
if path == "" {
return nil
}
deployment, err := deployment.NewDeploymentFromFile(path)
if err != nil {
return err
}
locators, err := LocatorsForDeploymentInstances(t, deployment.Instances.References())
if err != nil {
return err
}
if len(locators) == 0 {
return errors.New(fmt.Sprintf("There are no deployed instances listed in %s", path))
}
for i := range locators {
*args = append(*args, locators[i].Identity())
}
return nil
}

func LocatorsForDeploymentInstances(t transport.Transport, instances deployment.InstanceRefs) (Locators, error) {
locators := make(Locators, 0, len(instances))
for _, instance := range instances {
if instance.On != nil {
locator, err := t.LocatorFor(*instance.On)
if err != nil {
return Locators{}, err
}
resource := &ResourceLocator{ResourceTypeContainer, string(instance.Id), locator}
locators = append(locators, resource)
}
}
return locators, nil
}
63 changes: 26 additions & 37 deletions cmd/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@ package cmd
import (
"errors"
"fmt"
"github.com/openshift/geard/http"
"github.com/openshift/geard/jobs"
"github.com/openshift/geard/pkg/logstreamer"
"github.com/openshift/geard/transport"
"io"
"io/ioutil"
"log"
"os"
"reflect"
"strings"
"sync"
)
Expand All @@ -27,7 +26,7 @@ type FuncReact func(*CliJobResponse, io.Writer, interface{})
// parallel or sequentially. You must set either .Group
// or .Serial
type Executor struct {
On []Locator
On Locators
// Given a set of locators on the same server, create one
// job that represents all ids.
Group FuncBulk
Expand All @@ -42,6 +41,9 @@ type Executor struct {
OnSuccess FuncReact
// Optional: respond to errors when they occur
OnFailure FuncReact
// Optional: a way to transport a job to a remote server. If not
// specified remote locators will fail
Transport transport.Transport
}

// Invoke the appropriate job on each server and return the set of data
Expand Down Expand Up @@ -106,7 +108,7 @@ func (e Executor) StreamAndExit() {

func (e *Executor) run(gather bool) ([]*CliJobResponse, error) {
on := e.On
local, remote := Locators(on).Group()
local, remote := on.Group()
single := len(on) == 1
responses := []*CliJobResponse{}

Expand All @@ -115,15 +117,20 @@ func (e *Executor) run(gather bool) ([]*CliJobResponse, error) {
if err := localJobs.check(); err != nil {
return responses, err
}
remoteJobs := make([]remoteJobSet, len(remote))
remoteJobs := make([][]remoteJob, len(remote))
for i := range remote {
jobs := e.jobs(remote[i])
locator := remote[i]
jobs := e.jobs(locator)
if err := jobs.check(); err != nil {
return responses, err
}
remotes, err := jobs.remotes()
if err != nil {
return responses, err
remotes := make([]remoteJob, len(jobs))
for j := range jobs {
remote, err := e.Transport.RemoteJobFor(locator[0].TransportLocator(), jobs[j])
if err != nil {
return responses, err
}
remotes[j] = remoteJob{remote, jobs[j], locator[0]}
}
remoteJobs[i] = remotes
}
Expand Down Expand Up @@ -155,30 +162,22 @@ func (e *Executor) run(gather bool) ([]*CliJobResponse, error) {
}()
}

// Executes jobs against each remote server in parallel
// Executes jobs against each remote server in parallel (could parallel to each server if necessary)
for i := range remote {
ids := remote[i]
allJobs := remoteJobs[i]
host := ids[0].HostIdentity()
locator := ids[0].(http.RemoteLocator)
host := ids[0].TransportLocator()

tasks.Add(1)
go func() {
w := logstreamer.NewLogstreamer(stdout, prefixUnless(host+" ", single), false)
logger := log.New(w, "", 0)
w := logstreamer.NewLogstreamer(stdout, prefixUnless(host.String()+" ", single), false)
defer w.Close()
defer tasks.Done()

dispatcher := http.NewHttpDispatcher(locator, logger)
for _, job := range allJobs {
response := &CliJobResponse{Output: w, Gather: gather}
if err := dispatcher.Dispatch(job, response); err != nil {
// set an explicit error
response = &CliJobResponse{
Error: jobs.SimpleJobError{jobs.JobResponseError, fmt.Sprintf("The server did not respond correctly: %s", err.Error())},
}
}
respch <- e.react(response, w, job)
job.Execute(response)
respch <- e.react(response, w, job.Original)
}
}()
}
Expand Down Expand Up @@ -226,7 +225,11 @@ func (e *Executor) jobs(on []Locator) jobSet {
}

type jobSet []jobs.Job
type remoteJobSet []http.RemoteExecutable
type remoteJob struct {
jobs.Job
Original jobs.Job
Locator Locator
}

func (jobs jobSet) check() error {
for i := range jobs {
Expand All @@ -240,20 +243,6 @@ func (jobs jobSet) check() error {
return nil
}

func (jobs jobSet) remotes() (remotes remoteJobSet, err error) {
remotes = make(remoteJobSet, 0, len(remotes))
for i := range jobs {
job := jobs[i]
remotable, ok := job.(http.RemoteExecutable)
if !ok {
err = errors.New(fmt.Sprintf("Unable to run this action (%+v) against a remote server", reflect.TypeOf(job)))
return
}
remotes = append(remotes, remotable)
}
return
}

func Fail(code int, format string, other ...interface{}) {
fmt.Fprintf(os.Stderr, format, other...)
if !strings.HasSuffix(format, "\n") {
Expand Down
135 changes: 135 additions & 0 deletions cmd/executor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package cmd_test

import (
"fmt"
. "github.com/openshift/geard/cmd"
"github.com/openshift/geard/jobs"
"github.com/openshift/geard/transport"
"testing"
)

type testLocator struct {
Locator string
}

func (t *testLocator) String() string {
return t.Locator
}
func (t *testLocator) ResolveHostname() (string, error) {
return t.Locator, nil
}

type testTransport struct {
GotLocator string
Translated map[string]jobs.Job
Invoked map[string]jobs.JobResponse
}

func (t *testTransport) LocatorFor(locator string) (transport.Locator, error) {
t.GotLocator = locator
return &testLocator{locator}, nil
}
func (t *testTransport) RemoteJobFor(locator transport.Locator, job jobs.Job) (jobs.Job, error) {
if t.Translated == nil {
t.Translated = make(map[string]jobs.Job)
t.Invoked = make(map[string]jobs.JobResponse)
}
t.Translated[locator.String()] = job
invoked := func(res jobs.JobResponse) {
if _, found := t.Invoked[locator.String()]; found {
panic(fmt.Sprintf("Same job %+v invoked twice under %s", job, locator.String()))
}
t.Invoked[locator.String()] = res
res.Success(jobs.JobResponseOk)
}
return jobs.JobFunction(invoked), nil
}

func TestShouldSendRemoteJob(t *testing.T) {
trans := &testTransport{}
localhost := &testLocator{"localhost"}
initCalled := false
locator := &ResourceLocator{ResourceTypeContainer, "foobar", localhost}

Executor{
On: Locators{locator},
Serial: func(on Locator) jobs.Job {
if on != locator {
t.Fatalf("Expected locator passed to Serial() to be identical to %+v", locator)
}
return &jobs.StoppedContainerStateRequest{
Id: AsIdentifier(on),
}
},
LocalInit: func() error {
initCalled = true
return nil
},
Transport: trans,
}.Gather()

if initCalled {
t.Errorf("Local initialization should be bypassed for remote transports")
}
if _, ok := trans.Translated["localhost"]; !ok {
t.Errorf("Job for localhost was not enqueued in %+v", trans.Invoked)
}
if _, ok := trans.Invoked["localhost"]; !ok {
t.Errorf("Job for localhost was not enqueued in %+v", trans.Invoked)
}
}

func TestShouldExtractLocators(t *testing.T) {
trans := &testTransport{}
args := []string{}
err := ExtractContainerLocatorsFromDeployment(trans, "../deployment/fixtures/mongo_deploy_existing.json", &args)
if err != nil {
t.Fatalf("Expected no error from extract: %+v", err)
}
if len(args) != 3 {
t.Fatalf("Expected args to have 3 locators, not %d", len(args))
}
}

func TestShouldConvertLocator(t *testing.T) {
locator := &ResourceLocator{ResourceTypeContainer, "foobar", &testLocator{"localhost"}}
id := AsIdentifier(locator)
if id == "" {
t.Errorf("Locator should not have error on converting to identifier")
}
}

func TestShouldReadIdentifiersFromArgs(t *testing.T) {
ids, err := NewResourceLocators(&testTransport{}, ResourceTypeContainer, "ctr://localhost/foo", "bart", "ctr://local/bazi")
if err != nil {
t.Errorf("No error should occur reading locators: %s", err.Error())
}
if len(ids) != 3 {
t.Errorf("Should have received 3 ids: %+v", ids)
}
if string(AsIdentifier(ids[0])) != "" {
t.Error("First id should have value '' because foo is too short", ids[0])
}
if string(AsIdentifier(ids[1])) != "bart" {
t.Error("Second id should have value 'bart'", ids[1])
}
if string(AsIdentifier(ids[2])) != "bazi" {
t.Error("Third id should have value 'bazi'", ids[2])
}
}

func TestShoulCheckContainerArgsArgs(t *testing.T) {
ids, err := NewContainerLocators(&testTransport{}, "ctr://localhost/foo")
if err == nil {
t.Errorf("This locator should be invalid: %s", ids[0])
}
ids, err = NewContainerLocators(&testTransport{}, "bar")
if err == nil {
t.Errorf("This locator should be invalid: %s", ids[0])
}
ids, err = NewContainerLocators(&testTransport{}, "ctr://local/baz")
if err == nil {
t.Errorf("This locator should be invalid: %s", ids[0])
}

}
4 changes: 2 additions & 2 deletions cmd/gear/flags.go → cmd/flags.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package cmd

import (
"crypto/rand"
Expand Down Expand Up @@ -89,7 +89,7 @@ func (e *EnvironmentDescription) ExtractVariablesFrom(args *[]string, generateId
}
env, err := containers.ExtractEnvironmentVariablesFrom(args)
if err != nil {
log.Printf("Failed to extract env")
fmt.Fprintln(os.Stderr, "Failed to extract env: "+err.Error())
return err
}
e.Description.Variables = append(e.Description.Variables, env...)
Expand Down
13 changes: 13 additions & 0 deletions cmd/flags_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package cmd_test

import (
. "github.com/openshift/geard/cmd"
"testing"
)

func TestGenerateId(t *testing.T) {
s := GenerateId()
if s == "" {
t.Error("Expected generated ID to be non empty")
}
}
Loading

0 comments on commit 617b887

Please sign in to comment.