diff --git a/cluster/mesos/README.md b/cluster/mesos/README.md deleted file mode 100644 index 9395b9a2e8..0000000000 --- a/cluster/mesos/README.md +++ /dev/null @@ -1,150 +0,0 @@ -# Using Docker Swarm and Mesos - -Swarm comes with a built-in scheduler that works with the Swarm manager to -schedule container resources. You can completely replace the built-in scheduler -with a 3rd party scheduler. For example, you can replace it with the Mesos -scheduler as described here. - -When using Docker Swarm and Mesos, you use the Docker client to ask the Swarm -manager to schedule containers. The Swarm manager then schedules those -containers on a Mesos cluster. - -###### The Docker Swarm on Mesos integration is experimental and **not** production ready, please use with caution. - -## Prerequisites - -Each node in your Swarm must run a Mesos agent. The agent must be capable of -starting tasks in a Docker Container using the `--containerizers=docker` option. - -You need to configure two TCP ports on the agent. One port to listen for the -Swarm manager, for example 2375. And a second TCP port to listen for the Mesos -master, for example 3375. - -## Start the Docker Swarm manager - -If you use a single Mesos master: - -``` -$ docker run -d -p :2375 -p 3375:3375 \ - swarm manage \ - -c mesos-experimental \ - --cluster-opt mesos.address= \ - --cluster-opt mesos.port=3375 \ - : -``` - -The command above creates a Swarm manager listening at ``. -The `` value points to where Mesos master lives in the cluster. -Typically, this is localhost, a hostname, or an IP address. The `` -value is the IP address for Mesos master to talk to Swarm manager. If Mesos master -and Swarm manager are co-located on the same machine, you can use the `0.0.0.0` -or `localhost` value. - -If you use multiple Mesos masters: - -``` -$ docker run -d -p :2375 -p 3375:3375 \ - swarm manage \ - -c mesos-experimental \ - --cluster-opt mesos.address= \ - --cluster-opt mesos.port=3375 \ - zk:// -``` - -Once the manager is running, check your configuration by running `docker info` -as follows: - -``` -$ docker -H tcp://: info -Containers: 0 -Offers: 2 - Offer: 20150609-222929-1327399946-5050-14390-O6286 - └ cpus: 2 - └ mem: 1006 MiB - └ disk: 34.37 GiB - └ ports: 31000-32000 - Offer: 20150609-222929-1327399946-5050-14390-O6287 - └ cpus: 2 - └ mem: 1006 MiB - └ disk: 34.37 GiB - └ ports: 31000-32000 -``` - -If you run into `Abnormal executor termination` error, you might want to run the -Swarm container with an additional environment variable: -`SWARM_MESOS_USER=root`. - -Exporting the environment variable `DOCKER_HOST=:` will facilitate -interaction with the Docker client as it will not require `-H` anymore. - - -# Limitations - -Docker Swarm on Mesos provides a basic Mesos offers negotiation -It is also possible to use Docker-compose to communicate to the Docker Swarm manager how to schedule Docker containers on the Mesos cluster. -Docker Swarm on Mesos supports Mesos master detection with Zookeeper - -## Functionality - -Docker Swarm on Mesos supports the following subset of Docker CLI commands: - -+ attach -+ build -+ commit -+ cp -+ diff -+ events -+ export -+ history -+ images -+ info -+ inspect -+ logs -+ network - * create - * remove - * ls - * inspect - * connect - * disconnect -+ port -+ ps -+ run -+ save -+ search -+ stats -+ top - -## Unsupported docker commands - -The current Swarm scheduler for Mesos implementation is not feature complete. Hence the following Docker CLI commands are unsupported and may cause unpredictable results: - -+ create -+ import -+ kill -+ load -+ login -+ logout -+ pause -+ pull -+ push -+ rename -+ restart -+ rm -+ rmi -+ start -+ stop -+ tag -+ unpause -+ update -+ volume -+ wait - -## Known issues - -- Docker Swarm on Mesos only uses unreserved resources; and if Mesos offers reserved resources, the role info is also ignored which is rejected by Mesos master. See [here](https://github.com/docker/swarm/issues/1618) for a proposal on letting Docker Swarm on Mesos use both un-reserved/reserved resources. -- See [here](https://github.com/docker/swarm/issues/1619) for a proposal on letting Docker Swarm on Mesos use revocable resources. -- Restarted Mesos agents have flaky recovery in conjunction with Docker Swarm. When Mesos agents restart, Mesos master don’t always send offers for those agents to Docker Swarm. This issue seems to be solved with the Swarm 1.1 release. - -To have a global view of the tracked issues please refer to -[Mesos issues in Github](https://github.com/docker/swarm/issues?utf8=%E2%9C%93&q=is%3Aissue+is%3Aopen+mesos). diff --git a/cluster/mesos/agent.go b/cluster/mesos/agent.go deleted file mode 100644 index 48c51a9682..0000000000 --- a/cluster/mesos/agent.go +++ /dev/null @@ -1,79 +0,0 @@ -package mesos - -import ( - "sync" - - "github.com/docker/swarm/cluster" - "github.com/docker/swarm/cluster/mesos/task" - "github.com/mesos/mesos-go/mesosproto" -) - -type agent struct { - sync.RWMutex - - id string - offers map[string]*mesosproto.Offer - tasks map[string]*task.Task - engine *cluster.Engine -} - -func newAgent(sid string, e *cluster.Engine) *agent { - return &agent{ - id: sid, - offers: make(map[string]*mesosproto.Offer), - tasks: make(map[string]*task.Task), - engine: e, - } -} - -func (s *agent) addOffer(offer *mesosproto.Offer) { - s.Lock() - s.offers[offer.Id.GetValue()] = offer - s.Unlock() -} - -func (s *agent) addTask(task *task.Task) { - s.Lock() - s.tasks[task.TaskInfo.TaskId.GetValue()] = task - s.Unlock() -} - -func (s *agent) removeOffer(offerID string) bool { - s.Lock() - defer s.Unlock() - found := false - _, found = s.offers[offerID] - if found { - delete(s.offers, offerID) - } - return found -} - -func (s *agent) removeTask(taskID string) bool { - s.Lock() - defer s.Unlock() - found := false - _, found = s.tasks[taskID] - if found { - delete(s.tasks, taskID) - } - return found -} - -func (s *agent) empty() bool { - s.RLock() - defer s.RUnlock() - return len(s.offers) == 0 && len(s.tasks) == 0 -} - -func (s *agent) getOffers() map[string]*mesosproto.Offer { - s.RLock() - defer s.RUnlock() - return s.offers -} - -func (s *agent) getTasks() map[string]*task.Task { - s.RLock() - defer s.RUnlock() - return s.tasks -} diff --git a/cluster/mesos/agent_test.go b/cluster/mesos/agent_test.go deleted file mode 100644 index 49ac43a445..0000000000 --- a/cluster/mesos/agent_test.go +++ /dev/null @@ -1,100 +0,0 @@ -package mesos - -import ( - "testing" - "time" - - containertypes "github.com/docker/docker/api/types/container" - networktypes "github.com/docker/docker/api/types/network" - "github.com/docker/swarm/cluster" - "github.com/docker/swarm/cluster/mesos/task" - "github.com/mesos/mesos-go/mesosutil" - "github.com/stretchr/testify/assert" -) - -func TestNewAgent(t *testing.T) { - s := newAgent("SID", nil) - - assert.Equal(t, s.id, "SID") - assert.Empty(t, s.offers) - assert.Empty(t, s.tasks) - assert.True(t, s.empty()) -} - -func TestAddOffer(t *testing.T) { - s := newAgent("SID", nil) - - assert.Empty(t, s.offers) - assert.True(t, s.empty()) - - s.addOffer(mesosutil.NewOffer(mesosutil.NewOfferID("ID1"), nil, nil, "hostname1")) - s.addOffer(mesosutil.NewOffer(mesosutil.NewOfferID("ID2"), nil, nil, "hostname1")) - assert.Equal(t, len(s.offers), 2) - assert.Equal(t, len(s.getOffers()), 2) - assert.False(t, s.empty()) - - s.addOffer(mesosutil.NewOffer(mesosutil.NewOfferID("ID1"), nil, nil, "hostname1")) - assert.Equal(t, len(s.offers), 2) - assert.Equal(t, len(s.getOffers()), 2) -} - -func TestAddTask(t *testing.T) { - s := newAgent("SID", nil) - - assert.Empty(t, s.tasks) - assert.True(t, s.empty()) - - t1, err := task.NewTask(cluster.BuildContainerConfig(containertypes.Config{}, containertypes.HostConfig{}, networktypes.NetworkingConfig{}), "task1", 5*time.Second) - assert.NoError(t, err) - s.addTask(t1) - - t2, err := task.NewTask(cluster.BuildContainerConfig(containertypes.Config{}, containertypes.HostConfig{}, networktypes.NetworkingConfig{}), "task1", 5*time.Second) - assert.NoError(t, err) - s.addTask(t2) - assert.Equal(t, len(s.tasks), 2) - assert.Equal(t, len(s.getTasks()), 2) - assert.False(t, s.empty()) - - s.addTask(t1) - assert.Equal(t, len(s.tasks), 2) - assert.Equal(t, len(s.getTasks()), 2) -} - -func TestRemoveOffer(t *testing.T) { - s := newAgent("SID", nil) - - assert.Empty(t, s.offers) - - s.addOffer(mesosutil.NewOffer(mesosutil.NewOfferID("ID1"), nil, nil, "hostname1")) - s.addOffer(mesosutil.NewOffer(mesosutil.NewOfferID("ID2"), nil, nil, "hostname1")) - assert.Equal(t, len(s.offers), 2) - assert.Equal(t, len(s.getOffers()), 2) - - assert.True(t, s.removeOffer("ID1")) - assert.Equal(t, len(s.offers), 1) - assert.Equal(t, len(s.getOffers()), 1) - - assert.False(t, s.removeOffer("ID1")) -} - -func TestRemoveTask(t *testing.T) { - s := newAgent("SID", nil) - - assert.Empty(t, s.tasks) - - t1, err := task.NewTask(cluster.BuildContainerConfig(containertypes.Config{}, containertypes.HostConfig{}, networktypes.NetworkingConfig{}), "task1", 5*time.Second) - assert.NoError(t, err) - s.addTask(t1) - - t2, err := task.NewTask(cluster.BuildContainerConfig(containertypes.Config{}, containertypes.HostConfig{}, networktypes.NetworkingConfig{}), "task1", 5*time.Second) - assert.NoError(t, err) - s.addTask(t2) - assert.Equal(t, len(s.tasks), 2) - assert.Equal(t, len(s.getTasks()), 2) - - assert.True(t, s.removeTask(t1.TaskId.GetValue())) - assert.Equal(t, len(s.tasks), 1) - assert.Equal(t, len(s.getTasks()), 1) - - assert.False(t, s.removeTask(t1.TaskId.GetValue())) -} diff --git a/cluster/mesos/cluster.go b/cluster/mesos/cluster.go deleted file mode 100644 index 73585078a4..0000000000 --- a/cluster/mesos/cluster.go +++ /dev/null @@ -1,733 +0,0 @@ -package mesos - -import ( - "crypto/tls" - "encoding/json" - "errors" - "flag" - "fmt" - "io" - "os" - "sort" - "strings" - "sync" - "time" - - log "github.com/Sirupsen/logrus" - "github.com/docker/docker/api/types" - containertypes "github.com/docker/docker/api/types/container" - networktypes "github.com/docker/docker/api/types/network" - volumetypes "github.com/docker/docker/api/types/volume" - "github.com/docker/swarm/cluster" - "github.com/docker/swarm/cluster/mesos/task" - "github.com/docker/swarm/scheduler" - "github.com/docker/swarm/scheduler/node" - "github.com/gogo/protobuf/proto" - "github.com/mesos/mesos-go/mesosproto" - mesosscheduler "github.com/mesos/mesos-go/scheduler" -) - -// Cluster struct for mesos -type Cluster struct { - sync.RWMutex - - cluster.ClusterEventHandlers - - dockerEnginePort string - master string - agents map[string]*agent - scheduler *Scheduler - TLSConfig *tls.Config - options *cluster.DriverOpts - offerTimeout time.Duration - refuseTimeout time.Duration - taskCreationTimeout time.Duration - pendingTasks *task.Tasks - engineOpts *cluster.EngineOpts -} - -const ( - frameworkName = "swarm" - defaultDockerEnginePort = "2375" - defaultDockerEngineTLSPort = "2376" - dockerPortAttribute = "docker_port" - defaultOfferTimeout = 30 * time.Second - defaultRefuseTimeout = 5 * time.Second - defaultTaskCreationTimeout = 5 * time.Second -) - -var ( - errNotSupported = errors.New("not supported with mesos") - errResourcesNeeded = errors.New("resources constraints (-c and/or -m) are required by mesos") -) - -// NewCluster for mesos Cluster creation -func NewCluster(scheduler *scheduler.Scheduler, TLSConfig *tls.Config, master string, options cluster.DriverOpts, engineOptions *cluster.EngineOpts) (cluster.Cluster, error) { - log.WithFields(log.Fields{"name": "mesos"}).Debug("Initializing cluster") - - // Enabling mesos-go glog logging - if log.GetLevel() == log.DebugLevel { - flag.Lookup("logtostderr").Value.Set("true") - } - cluster := &Cluster{ - ClusterEventHandlers: cluster.NewClusterEventHandlers(), - dockerEnginePort: defaultDockerEnginePort, - master: master, - agents: make(map[string]*agent), - TLSConfig: TLSConfig, - options: &options, - offerTimeout: defaultOfferTimeout, - taskCreationTimeout: defaultTaskCreationTimeout, - engineOpts: engineOptions, - refuseTimeout: defaultRefuseTimeout, - } - - cluster.pendingTasks = task.NewTasks(cluster) - - // Empty string is accepted by the scheduler. - user, _ := options.String("mesos.user", "SWARM_MESOS_USER") - - // Override the hostname here because mesos-go will try - // to shell out to the hostname binary and it won't work with our official image. - // Do not check error here, so mesos-go can still try. - hostname, _ := os.Hostname() - - driverConfig := mesosscheduler.DriverConfig{ - Framework: &mesosproto.FrameworkInfo{Name: proto.String(frameworkName), User: &user}, - Master: cluster.master, - HostnameOverride: hostname, - } - - if taskCreationTimeout, ok := options.String("mesos.tasktimeout", "SWARM_MESOS_TASK_TIMEOUT"); ok { - d, err := time.ParseDuration(taskCreationTimeout) - if err != nil { - return nil, err - } - cluster.taskCreationTimeout = d - } - // Changing port for https - if cluster.TLSConfig != nil { - cluster.dockerEnginePort = defaultDockerEngineTLSPort - } - - if bindingPort, ok := options.Uint("mesos.port", "SWARM_MESOS_PORT"); ok { - driverConfig.BindingPort = uint16(bindingPort) - } - - if bindingAddress, ok := options.IP("mesos.address", "SWARM_MESOS_ADDRESS"); ok { - if bindingAddress == nil { - value, _ := options.String("mesos.address", "SWARM_MESOS_ADDRESS") - return nil, fmt.Errorf( - "invalid IP address for cluster-opt mesos.address: \"%s\"", - value) - } - driverConfig.BindingAddress = bindingAddress - } - - if checkpointFailover, ok := options.Bool("mesos.checkpointfailover", "SWARM_MESOS_CHECKPOINT_FAILOVER"); ok { - driverConfig.Framework.Checkpoint = &checkpointFailover - } - - if offerTimeout, ok := options.String("mesos.offertimeout", "SWARM_MESOS_OFFER_TIMEOUT"); ok { - d, err := time.ParseDuration(offerTimeout) - if err != nil { - return nil, err - } - cluster.offerTimeout = d - } - - if refuseTimeout, ok := options.String("mesos.offerrefusetimeout", "SWARM_MESOS_OFFER_REFUSE_TIMEOUT"); ok { - d, err := time.ParseDuration(refuseTimeout) - if err != nil { - return nil, err - } - cluster.refuseTimeout = d - } - - sched, err := NewScheduler(driverConfig, cluster, scheduler) - if err != nil { - return nil, err - } - - cluster.scheduler = sched - status, err := sched.driver.Start() - if err != nil { - log.Debugf("Mesos driver started, status/err %v: %v", status, err) - return nil, err - } - log.Debugf("Mesos driver started, status %v", status) - - go func() { - status, err := sched.driver.Join() - log.Debugf("Mesos driver stopped unexpectedly, status/err %v: %v", status, err) - - }() - - return cluster, nil -} - -// NewAPIEventHandler creates a new API events handler -func (c *Cluster) NewAPIEventHandler() *cluster.APIEventHandler { - return cluster.NewAPIEventHandler() -} - -// StartContainer starts a container -func (c *Cluster) StartContainer(container *cluster.Container) error { - // if the container was started less than a second ago in detach mode, do not start it - if time.Now().Unix()-container.Created > 1 || container.Config.Labels[cluster.SwarmLabelNamespace+".mesos.detach"] != "true" { - return container.Engine.StartContainer(container) - } - return nil -} - -// CreateContainer for container creation in Mesos task -func (c *Cluster) CreateContainer(config *cluster.ContainerConfig, name string, authConfig *types.AuthConfig) (*cluster.Container, error) { - if config.HostConfig.Memory == 0 && config.HostConfig.CPUShares == 0 { - return nil, errResourcesNeeded - } - - if !c.checkNameUniqueness(name) { - return nil, fmt.Errorf("Conflict: The name %s is already assigned or in pending tasks. You have to delete (or rename) that container to be able to assign %s to a container again.", name, name) - } - - task, err := task.NewTask(config, name, c.taskCreationTimeout) - if err != nil { - return nil, err - } - - go c.pendingTasks.Add(task) - - select { - case container := <-task.GetContainer(): - return formatContainer(container), nil - case err := <-task.Error: - c.pendingTasks.Remove(task) - return nil, err - } -} - -// RemoveContainer removes containers on mesos cluster -func (c *Cluster) RemoveContainer(container *cluster.Container, force, volumes bool) error { - c.scheduler.Lock() - defer c.scheduler.Unlock() - - return container.Engine.RemoveContainer(container, force, volumes) -} - -// Images returns all the images in the cluster. -func (c *Cluster) Images() cluster.Images { - c.RLock() - defer c.RUnlock() - - out := []*cluster.Image{} - for _, s := range c.agents { - out = append(out, s.engine.Images()...) - } - return out -} - -// Image returns an image with IdOrName in the cluster -func (c *Cluster) Image(IDOrName string) *cluster.Image { - // Abort immediately if the name is empty. - if len(IDOrName) == 0 { - return nil - } - - c.RLock() - defer c.RUnlock() - - for _, s := range c.agents { - if image := s.engine.Image(IDOrName); image != nil { - return image - } - } - return nil -} - -// RemoveImages removes images from the cluster -func (c *Cluster) RemoveImages(name string, force bool) ([]types.ImageDeleteResponseItem, error) { - return nil, errNotSupported -} - -// CreateNetwork creates a network in the cluster -func (c *Cluster) CreateNetwork(name string, request *types.NetworkCreate) (*types.NetworkCreateResponse, error) { - var ( - parts = strings.SplitN(name, "/", 2) - config = &cluster.ContainerConfig{} - ) - - if len(parts) == 2 { - // a node was specified, create the container only on this node - name = parts[1] - config = cluster.BuildContainerConfig(containertypes.Config{Env: []string{"constraint:node==" + parts[0]}}, containertypes.HostConfig{}, networktypes.NetworkingConfig{}) - } - - c.scheduler.Lock() - nodes, err := c.scheduler.SelectNodesForContainer(c.listNodes(), config) - c.scheduler.Unlock() - if err != nil { - return nil, err - } - if nodes == nil { - return nil, errors.New("cannot find node to create network") - } - n := nodes[0] - s, ok := c.agents[n.ID] - if !ok { - return nil, fmt.Errorf("Unable to create network on agent %q", n.ID) - } - resp, err := s.engine.CreateNetwork(name, request) - c.refreshNetworks() - return resp, err -} - -func (c *Cluster) refreshNetworks() { - var wg sync.WaitGroup - for _, s := range c.agents { - e := s.engine - wg.Add(1) - go func(e *cluster.Engine) { - e.RefreshNetworks() - wg.Done() - }(e) - } - wg.Wait() -} - -// CreateVolume creates a volume in the cluster -func (c *Cluster) CreateVolume(request *volumetypes.VolumesCreateBody) (*types.Volume, error) { - return nil, errNotSupported -} - -// RemoveNetwork removes network from the cluster -func (c *Cluster) RemoveNetwork(network *cluster.Network) error { - err := network.Engine.RemoveNetwork(network) - c.refreshNetworks() - return err -} - -// RemoveVolumes removes volumes from the cluster -func (c *Cluster) RemoveVolumes(name string) (bool, error) { - return false, errNotSupported -} - -func formatContainer(container *cluster.Container) *cluster.Container { - if container == nil { - return nil - } - if name := container.Config.Labels[cluster.SwarmLabelNamespace+".mesos.name"]; name != "" && container.Names[0] != "/"+name { - container.Names = append([]string{"/" + name}, container.Names...) - } - return container -} - -// Containers returns all the containers in the cluster. -func (c *Cluster) Containers() cluster.Containers { - c.RLock() - defer c.RUnlock() - - out := cluster.Containers{} - for _, s := range c.agents { - for _, container := range s.engine.Containers() { - if container.Config.Labels != nil { - if _, ok := container.Config.Labels[cluster.SwarmLabelNamespace+".mesos.task"]; ok { - out = append(out, formatContainer(container)) - } - } - } - } - - return out -} - -// Container returns the container with IdOrName in the cluster -func (c *Cluster) Container(IDOrName string) *cluster.Container { - // Abort immediately if the name is empty. - if len(IDOrName) == 0 { - return nil - } - - return formatContainer(cluster.Containers(c.Containers()).Get(IDOrName)) -} - -// RemoveImage removes an image from the cluster -func (c *Cluster) RemoveImage(image *cluster.Image) ([]types.ImageDeleteResponseItem, error) { - return nil, errNotSupported -} - -// Pull pulls images on the cluster nodes -func (c *Cluster) Pull(name string, authConfig *types.AuthConfig, callback func(msg cluster.JSONMessageWrapper)) { - -} - -// Load images -func (c *Cluster) Load(imageReader io.Reader, callback func(msg cluster.JSONMessageWrapper)) { - -} - -// Import image -func (c *Cluster) Import(source string, ref string, tag string, imageReader io.Reader, callback func(msg cluster.JSONMessageWrapper)) { - -} - -// RenameContainer renames a container -func (c *Cluster) RenameContainer(container *cluster.Container, newName string) error { - //FIXME this doesn't work as the next refreshcontainer will erase this change (this change is in-memory only) - - if !c.checkNameUniqueness(newName) { - return fmt.Errorf("Conflict: The name %s is already assigned, or in pending tasks. You have to delete (or rename) that container to be able to assign %s to a container again.", newName, newName) - } - - container.Config.Labels[cluster.SwarmLabelNamespace+".mesos.name"] = newName - - return nil -} - -// Networks returns all the networks in the cluster. -func (c *Cluster) Networks() cluster.Networks { - c.RLock() - defer c.RUnlock() - - out := cluster.Networks{} - for _, s := range c.agents { - out = append(out, s.engine.Networks()...) - } - - return out - -} - -// Volumes returns all the volumes in the cluster. -func (c *Cluster) Volumes() cluster.Volumes { - return nil -} - -// listNodes returns all the nodes in the cluster. -func (c *Cluster) listNodes() []*node.Node { - c.RLock() - defer c.RUnlock() - - out := []*node.Node{} - for _, s := range c.agents { - n := node.NewNode(s.engine) - n.ID = s.id - n.TotalCpus = int64(sumScalarResourceValue(s.offers, "cpus")) - n.UsedCpus = 0 - n.TotalMemory = int64(sumScalarResourceValue(s.offers, "mem")) * 1024 * 1024 - n.UsedMemory = 0 - out = append(out, n) - } - return out -} - -func (c *Cluster) listOffers() []*mesosproto.Offer { - c.RLock() - defer c.RUnlock() - - list := []*mesosproto.Offer{} - for _, s := range c.agents { - for _, offer := range s.offers { - list = append(list, offer) - } - } - return list -} - -// TotalMemory returns the total memory of the cluster -func (c *Cluster) TotalMemory() int64 { - c.RLock() - defer c.RUnlock() - var totalMemory int64 - for _, s := range c.agents { - totalMemory += int64(sumScalarResourceValue(s.offers, "mem")) * 1024 * 1024 - } - return totalMemory -} - -// TotalCpus returns the total memory of the cluster -func (c *Cluster) TotalCpus() int64 { - c.RLock() - defer c.RUnlock() - var totalCpus int64 - for _, s := range c.agents { - totalCpus += int64(sumScalarResourceValue(s.offers, "cpus")) - } - return totalCpus -} - -// Info gives minimal information about containers and resources on the mesos cluster -func (c *Cluster) Info() [][2]string { - offers := c.listOffers() - info := [][2]string{ - {"Strategy", c.scheduler.Strategy()}, - {"Filters", c.scheduler.Filters()}, - {"Offers", fmt.Sprintf("%d", len(offers))}, - } - - sort.Sort(offerSorter(offers)) - - for _, offer := range offers { - info = append(info, [2]string{" Offer", offer.Id.GetValue()}) - for _, resource := range offer.Resources { - info = append(info, [2]string{" └ " + resource.GetName(), formatResource(resource)}) - } - } - - return info -} - -func (c *Cluster) addOffer(offer *mesosproto.Offer) { - s, ok := c.agents[offer.SlaveId.GetValue()] - if !ok { - return - } - s.addOffer(offer) - go func(offer *mesosproto.Offer) { - time.Sleep(c.offerTimeout) - // declining Mesos offers to make them available to other Mesos services - if c.removeOffer(offer) { - if _, err := c.scheduler.driver.DeclineOffer(offer.Id, &mesosproto.Filters{}); err != nil { - log.WithFields(log.Fields{"name": "mesos"}).Errorf("Error while declining offer %q: %v", offer.Id.GetValue(), err) - } else { - log.WithFields(log.Fields{"name": "mesos"}).Debugf("Offer %q declined successfully", offer.Id.GetValue()) - } - } - }(offer) -} - -func (c *Cluster) removeOffer(offer *mesosproto.Offer) bool { - log.WithFields(log.Fields{"name": "mesos", "offerID": offer.Id.String()}).Debug("Removing offer") - s, ok := c.agents[offer.SlaveId.GetValue()] - if !ok { - return false - } - found := s.removeOffer(offer.Id.GetValue()) - if s.empty() { - // Disconnect from engine - s.engine.Disconnect() - delete(c.agents, offer.SlaveId.GetValue()) - } - return found -} - -// LaunchTask selects node and calls driver to launch a task -func (c *Cluster) LaunchTask(t *task.Task) bool { - c.scheduler.Lock() - //change to explicit lock defer c.scheduler.Unlock() - - nodes, err := c.scheduler.SelectNodesForContainer(c.listNodes(), t.GetConfig()) - if err != nil { - c.scheduler.Unlock() - return false - } - n := nodes[0] - s, ok := c.agents[n.ID] - if !ok { - t.Error <- fmt.Errorf("Unable to create on agent %q", n.ID) - c.scheduler.Unlock() - return true - } - - // build the offer from its internal config and set the agentID - - c.Lock() - // TODO: Only use the offer we need - offerIDs := []*mesosproto.OfferID{} - for _, offer := range c.agents[n.ID].offers { - offerIDs = append(offerIDs, offer.Id) - } - - t.Build(n.ID, c.agents[n.ID].offers) - - offerFilters := &mesosproto.Filters{} - refuseSeconds := c.refuseTimeout.Seconds() - offerFilters.RefuseSeconds = &refuseSeconds - - if _, err := c.scheduler.driver.LaunchTasks(offerIDs, []*mesosproto.TaskInfo{&t.TaskInfo}, offerFilters); err != nil { - // TODO: Do not erase all the offers, only the one used - for _, offer := range s.offers { - c.removeOffer(offer) - } - c.Unlock() - c.scheduler.Unlock() - t.Error <- err - return true - } - - s.addTask(t) - - // TODO: Do not erase all the offers, only the one used - for _, offer := range s.offers { - c.removeOffer(offer) - } - c.Unlock() - c.scheduler.Unlock() - // block until we get the container - finished, data, err := t.Monitor() - taskID := t.TaskInfo.TaskId.GetValue() - if err != nil { - //remove task - s.removeTask(taskID) - t.Error <- err - return true - } - if !finished { - go func() { - for { - finished, _, err := t.Monitor() - if err != nil { - // TODO do a better log by sending proper error message - log.Error(err) - break - } - if finished { - break - } - } - //remove the task once it's finished - }() - } - - // Register the container immediately while waiting for a state refresh. - - // In mesos 0.23+ the docker inspect will be sent back in the taskStatus.Data - // We can use this to find the right container. - inspect := []types.ContainerJSONBase{} - if data != nil && json.Unmarshal(data, &inspect) == nil && len(inspect) == 1 { - container := &cluster.Container{Container: types.Container{ID: inspect[0].ID}, Engine: s.engine} - if container, err := container.Refresh(); err == nil { - if !t.Stopped() { - t.SetContainer(container) - } - return true - } - } - - log.Debug("Cannot parse docker info from task status, please upgrade Mesos to the latest version") - // For mesos <= 0.22 we fallback to a full refresh + using labels - // TODO: once 0.23 or 0.24 is released, remove all this block of code as it - // doesn't scale very well. - s.engine.RefreshContainers(true) - - for _, container := range s.engine.Containers() { - if container.Config.Labels[cluster.SwarmLabelNamespace+".mesos.task"] == taskID { - if !t.Stopped() { - t.SetContainer(container) - } - return true - } - } - - if !t.Stopped() { - t.Error <- fmt.Errorf("Container failed to create") - } - return true -} - -// RANDOMENGINE returns a random engine. -func (c *Cluster) RANDOMENGINE() (*cluster.Engine, error) { - c.RLock() - defer c.RUnlock() - - nodes, err := c.scheduler.SelectNodesForContainer(c.listNodes(), &cluster.ContainerConfig{}) - if err != nil { - return nil, err - } - n := nodes[0] - return c.agents[n.ID].engine, nil -} - -// BuildImage builds an image -func (c *Cluster) BuildImage(buildContext io.Reader, buildImage *types.ImageBuildOptions, callback func(msg cluster.JSONMessageWrapper)) error { - c.scheduler.Lock() - - // get an engine - config := &cluster.ContainerConfig{ - HostConfig: containertypes.HostConfig{ - Resources: containertypes.Resources{ - CPUShares: buildImage.CPUShares, - Memory: buildImage.Memory, - }, - }, - } - nodes, err := c.scheduler.SelectNodesForContainer(c.listNodes(), config) - c.scheduler.Unlock() - if err != nil { - return err - } - n := nodes[0] - engine := c.agents[n.ID].engine - - var engineCallback func(msg cluster.JSONMessage) - if callback != nil { - engineCallback = func(msg cluster.JSONMessage) { - callback(cluster.JSONMessageWrapper{ - EngineName: engine.Name, - Msg: msg, - }) - } - } - err = engine.BuildImage(buildContext, buildImage, engineCallback) - if callback != nil { - if err != nil { - callback(cluster.JSONMessageWrapper{ - EngineName: engine.Name, - Err: err, - }) - } else { - callback(cluster.JSONMessageWrapper{ - EngineName: engine.Name, - Success: true, - }) - } - } - - engine.RefreshImages() - return nil -} - -// TagImage tags an image -func (c *Cluster) TagImage(IDOrName string, ref string, force bool) error { - return errNotSupported -} - -func (c *Cluster) checkNameUniqueness(name string) bool { - // Abort immediately if the name is empty. - if len(name) == 0 { - return true - } - - c.RLock() - defer c.RUnlock() - - for _, s := range c.agents { - for _, container := range s.engine.Containers() { - for _, cname := range container.Names { - if cname == name || cname == "/"+name { - return false - } - } - } - } - - for _, task := range c.pendingTasks.Tasks { - config := task.GetConfig() - if config.Labels != nil { - if tname, ok := config.Labels[cluster.SwarmLabelNamespace+".mesos.name"]; ok { - if tname == name || tname == "/"+name { - return false - } - } - } - } - - return true -} - -func (c *Cluster) RefreshEngine(hostname string) error { - return nil -} - -func (c *Cluster) RefreshEngines() error { - return nil -} diff --git a/cluster/mesos/cluster_test.go b/cluster/mesos/cluster_test.go deleted file mode 100644 index c3e7aa1d0b..0000000000 --- a/cluster/mesos/cluster_test.go +++ /dev/null @@ -1,95 +0,0 @@ -package mesos - -import ( - "testing" - "time" - - "github.com/docker/docker/api/types" - containertypes "github.com/docker/docker/api/types/container" - networktypes "github.com/docker/docker/api/types/network" - "github.com/docker/swarm/cluster" - "github.com/stretchr/testify/assert" -) - -func createAgent(t *testing.T, ID string, containers ...*cluster.Container) *agent { - engOpts := &cluster.EngineOpts{ - RefreshMinInterval: time.Duration(30) * time.Second, - RefreshMaxInterval: time.Duration(60) * time.Second, - FailureRetry: 3, - } - engine := cluster.NewEngine(ID, 0, engOpts) - engine.Name = ID - engine.ID = ID - - for _, container := range containers { - container.Engine = engine - engine.AddContainer(container) - } - - return newAgent("agent-"+ID, engine) -} - -func TestContainerLookup(t *testing.T) { - c := &Cluster{ - agents: make(map[string]*agent), - } - - container1 := &cluster.Container{ - Container: types.Container{ - ID: "container1-id", - Names: []string{"/container1-name1", "/container1-name2"}, - }, - Config: cluster.BuildContainerConfig(containertypes.Config{ - Labels: map[string]string{ - "com.docker.swarm.mesos.task": "task1-id", - "com.docker.swarm.mesos.name": "container1-name", - }, - }, containertypes.HostConfig{}, networktypes.NetworkingConfig{}), - } - - container2 := &cluster.Container{ - Container: types.Container{ - ID: "container2-id", - Names: []string{"/con"}, - }, - Config: cluster.BuildContainerConfig(containertypes.Config{ - Labels: map[string]string{ - "com.docker.swarm.mesos.task": "task2-id", - "com.docker.swarm.mesos.name": "con", - }, - }, containertypes.HostConfig{}, networktypes.NetworkingConfig{}), - } - - container3 := &cluster.Container{ - Container: types.Container{ - ID: "container3-id", - Names: []string{"/container3-name"}, - }, - Config: cluster.BuildContainerConfig(containertypes.Config{}, containertypes.HostConfig{}, networktypes.NetworkingConfig{}), - } - - s := createAgent(t, "test-engine", container1, container2, container3) - c.agents[s.id] = s - - // Hide container without `com.docker.swarm.mesos.task` - assert.Equal(t, len(c.Containers()), 2) - - // Invalid lookup - assert.Nil(t, c.Container("invalid-id")) - assert.Nil(t, c.Container("")) - // Container ID lookup. - assert.NotNil(t, c.Container("container1-id")) - // Container ID prefix lookup. - assert.NotNil(t, c.Container("container1-")) - assert.Nil(t, c.Container("container")) - // Container name lookup. - assert.NotNil(t, c.Container("container1-name1")) - assert.NotNil(t, c.Container("container1-name2")) - // Container engine/name matching. - assert.NotNil(t, c.Container("test-engine/container1-name1")) - assert.NotNil(t, c.Container("test-engine/container1-name2")) - // Match name before ID prefix - cc := c.Container("con") - assert.NotNil(t, cc) - assert.Equal(t, cc.ID, "container2-id") -} diff --git a/cluster/mesos/offer_sorter.go b/cluster/mesos/offer_sorter.go deleted file mode 100644 index 135ce08978..0000000000 --- a/cluster/mesos/offer_sorter.go +++ /dev/null @@ -1,23 +0,0 @@ -package mesos - -import "github.com/mesos/mesos-go/mesosproto" - -// OfferSorter implements the Sort interface to sort offers. -// It is not guaranteed to be a stable sort. -type offerSorter []*mesosproto.Offer - -// Len returns the number of engines to be sorted. -func (s offerSorter) Len() int { - return len(s) -} - -// Swap exchanges the engine elements with indices i and j. -func (s offerSorter) Swap(i, j int) { - s[i], s[j] = s[j], s[i] -} - -// Less reports whether the engine with index i should sort before the engine with index j. -// Offers are sorted chronologically by name. -func (s offerSorter) Less(i, j int) bool { - return s[i].Id.GetValue() < s[j].Id.GetValue() -} diff --git a/cluster/mesos/offer_sorter_test.go b/cluster/mesos/offer_sorter_test.go deleted file mode 100644 index 2c8375fb66..0000000000 --- a/cluster/mesos/offer_sorter_test.go +++ /dev/null @@ -1,24 +0,0 @@ -package mesos - -import ( - "sort" - "testing" - - "github.com/gogo/protobuf/proto" - "github.com/mesos/mesos-go/mesosproto" - "github.com/stretchr/testify/assert" -) - -func TestOfferSorter(t *testing.T) { - offers := []*mesosproto.Offer{ - {Id: &mesosproto.OfferID{Value: proto.String("id1")}}, - {Id: &mesosproto.OfferID{Value: proto.String("id3")}}, - {Id: &mesosproto.OfferID{Value: proto.String("id2")}}, - } - - sort.Sort(offerSorter(offers)) - - assert.Equal(t, offers[0].Id.GetValue(), "id1") - assert.Equal(t, offers[1].Id.GetValue(), "id2") - assert.Equal(t, offers[2].Id.GetValue(), "id3") -} diff --git a/cluster/mesos/scheduler.go b/cluster/mesos/scheduler.go deleted file mode 100644 index be93cf3c39..0000000000 --- a/cluster/mesos/scheduler.go +++ /dev/null @@ -1,146 +0,0 @@ -package mesos - -import ( - "fmt" - - log "github.com/Sirupsen/logrus" - "github.com/docker/swarm/cluster" - "github.com/docker/swarm/scheduler" - "github.com/mesos/mesos-go/mesosproto" - mesosscheduler "github.com/mesos/mesos-go/scheduler" -) - -// Scheduler structure for mesos driver -type Scheduler struct { - scheduler.Scheduler - - driver *mesosscheduler.MesosSchedulerDriver - cluster *Cluster -} - -// NewScheduler for Scheduler mesos driver creation -func NewScheduler(config mesosscheduler.DriverConfig, cluster *Cluster, sched *scheduler.Scheduler) (*Scheduler, error) { - scheduler := Scheduler{ - Scheduler: *sched, - cluster: cluster, - } - - config.Scheduler = &scheduler - driver, err := mesosscheduler.NewMesosSchedulerDriver(config) - if err != nil { - return nil, err - } - scheduler.driver = driver - return &scheduler, nil -} - -// Registered method for registered mesos framework -func (s *Scheduler) Registered(driver mesosscheduler.SchedulerDriver, fwID *mesosproto.FrameworkID, masterInfo *mesosproto.MasterInfo) { - log.WithFields(log.Fields{"name": "mesos", "frameworkId": fwID.GetValue()}).Debug("Framework registered") -} - -// Reregistered method for registered mesos framework -func (s *Scheduler) Reregistered(mesosscheduler.SchedulerDriver, *mesosproto.MasterInfo) { - log.WithFields(log.Fields{"name": "mesos"}).Debug("Framework re-registered") -} - -// Disconnected method -func (s *Scheduler) Disconnected(mesosscheduler.SchedulerDriver) { - log.WithFields(log.Fields{"name": "mesos"}).Debug("Framework disconnected") -} - -// ResourceOffers method -func (s *Scheduler) ResourceOffers(_ mesosscheduler.SchedulerDriver, offers []*mesosproto.Offer) { - log.WithFields(log.Fields{"name": "mesos", "offers": len(offers)}).Debug("Offers received") - - for _, offer := range offers { - agentID := offer.SlaveId.GetValue() - dockerPort := s.cluster.dockerEnginePort - - for _, attribute := range offer.GetAttributes() { - if attribute.GetName() == dockerPortAttribute { - switch attribute.GetType() { - case mesosproto.Value_SCALAR: - dockerPort = fmt.Sprintf("%d", int(attribute.GetScalar().GetValue())) - case mesosproto.Value_TEXT: - dockerPort = attribute.GetText().GetValue() - } - } - } - - a, ok := s.cluster.agents[agentID] - if !ok { - engine := cluster.NewEngine(*offer.Hostname+":"+dockerPort, 0, s.cluster.engineOpts) - if err := engine.Connect(s.cluster.TLSConfig); err != nil { - log.Error(err) - } else { - // Set engine state to healthy and start refresh loop - engine.ValidationComplete() - a = newAgent(agentID, engine) - s.cluster.agents[agentID] = a - if err := a.engine.RegisterEventHandler(s.cluster); err != nil { - log.Error(err) - } - } - - } - s.cluster.addOffer(offer) - - } - go s.cluster.pendingTasks.Process() -} - -// OfferRescinded method -func (s *Scheduler) OfferRescinded(_ mesosscheduler.SchedulerDriver, offerID *mesosproto.OfferID) { - log.WithFields(log.Fields{"name": "mesos", "OfferID": offerID.GetValue()}).Debug("Offer Rescinded") - - for _, agent := range s.cluster.agents { - if offer, ok := agent.offers[offerID.GetValue()]; ok { - s.cluster.removeOffer(offer) - break - } - } -} - -// StatusUpdate method -func (s *Scheduler) StatusUpdate(_ mesosscheduler.SchedulerDriver, taskStatus *mesosproto.TaskStatus) { - log.WithFields(log.Fields{"name": "mesos", "state": taskStatus.State.String()}).Debug("Status update") - taskID := taskStatus.TaskId.GetValue() - agentID := taskStatus.SlaveId.GetValue() - a, ok := s.cluster.agents[agentID] - if !ok { - return - } - if task, ok := a.tasks[taskID]; ok { - task.SendStatus(taskStatus) - } else { - var reason = "" - if taskStatus.Reason != nil { - reason = taskStatus.GetReason().String() - } - - log.WithFields(log.Fields{ - "name": "mesos", - "state": taskStatus.State.String(), - "agentId": taskStatus.SlaveId.GetValue(), - "reason": reason, - }).Warn("Status update received for unknown agent") - } -} - -// FrameworkMessage method -func (s *Scheduler) FrameworkMessage(mesosscheduler.SchedulerDriver, *mesosproto.ExecutorID, *mesosproto.SlaveID, string) { -} - -// SlaveLost method -func (s *Scheduler) SlaveLost(mesosscheduler.SchedulerDriver, *mesosproto.SlaveID) { -} - -// ExecutorLost method -func (s *Scheduler) ExecutorLost(mesosscheduler.SchedulerDriver, *mesosproto.ExecutorID, *mesosproto.SlaveID, int) { -} - -// Error method -func (s *Scheduler) Error(d mesosscheduler.SchedulerDriver, msg string) { - log.WithFields(log.Fields{"name": "mesos"}).Error(msg) -} diff --git a/cluster/mesos/task/task.go b/cluster/mesos/task/task.go deleted file mode 100644 index 2e695875ed..0000000000 --- a/cluster/mesos/task/task.go +++ /dev/null @@ -1,239 +0,0 @@ -package task - -import ( - "errors" - "fmt" - "strconv" - "strings" - "time" - - log "github.com/Sirupsen/logrus" - "github.com/docker/docker/pkg/stringid" - "github.com/docker/swarm/cluster" - "github.com/gogo/protobuf/proto" - "github.com/mesos/mesos-go/mesosproto" - "github.com/mesos/mesos-go/mesosutil" -) - -// Task struct inherits from TaskInfo and represents a mesos task -type Task struct { - mesosproto.TaskInfo - - updates chan *mesosproto.TaskStatus - - config *cluster.ContainerConfig - Error chan error - container chan *cluster.Container - done bool -} - -// GetContainer returns the container channel from the task -// where the Swarm API sends the created container -func (t *Task) GetContainer() chan *cluster.Container { - return t.container -} - -// SetContainer writes on the container channel from the task -func (t *Task) SetContainer(container *cluster.Container) { - t.container <- container -} - -// GetConfig returns the container configuration of the task -func (t *Task) GetConfig() *cluster.ContainerConfig { - return t.config -} - -// ID method returns the taskId -func (t *Task) ID() string { - return t.TaskId.GetValue() -} - -// Stopped method returns a boolean determining if the task -// is done -func (t *Task) Stopped() bool { - return t.done -} - -// Stop method sets the boolean determining if the task is done -func (t *Task) Stop() { - t.done = true -} - -// Build method builds the task -func (t *Task) Build(slaveID string, offers map[string]*mesosproto.Offer) { - t.Command = &mesosproto.CommandInfo{Shell: proto.Bool(false)} - - t.Container = &mesosproto.ContainerInfo{ - Type: mesosproto.ContainerInfo_DOCKER.Enum(), - Docker: &mesosproto.ContainerInfo_DockerInfo{ - Image: &t.config.Image, - }, - } - - if t.config.Hostname != "" { - t.Container.Hostname = proto.String(t.config.Hostname) - if t.config.Domainname != "" { - t.Container.Hostname = proto.String(t.config.Hostname + "." + t.config.Domainname) - } - } - - switch t.config.HostConfig.NetworkMode { - case "none": - t.Container.Docker.Network = mesosproto.ContainerInfo_DockerInfo_NONE.Enum() - case "host": - t.Container.Docker.Network = mesosproto.ContainerInfo_DockerInfo_HOST.Enum() - case "default", "bridge", "": - var ports []uint64 - - for _, offer := range offers { - ports = append(ports, getPorts(offer)...) - } - - for containerProtoPort, bindings := range t.config.HostConfig.PortBindings { - for _, binding := range bindings { - containerInfo := strings.SplitN(string(containerProtoPort), "/", 2) - containerPort, err := strconv.ParseUint(containerInfo[0], 10, 32) - if err != nil { - log.Warn(err) - continue - } - - var hostPort uint64 - - if binding.HostPort != "" { - hostPort, err = strconv.ParseUint(binding.HostPort, 10, 32) - if err != nil { - log.Warn(err) - continue - } - } else if len(ports) > 0 { - hostPort = ports[0] - ports = ports[1:] - } - - if hostPort == 0 { - log.Warn("cannot find port to bind on the host") - continue - } - - protocol := "tcp" - if len(containerInfo) == 2 { - protocol = containerInfo[1] - } - t.Container.Docker.PortMappings = append(t.Container.Docker.PortMappings, &mesosproto.ContainerInfo_DockerInfo_PortMapping{ - HostPort: proto.Uint32(uint32(hostPort)), - ContainerPort: proto.Uint32(uint32(containerPort)), - Protocol: proto.String(protocol), - }) - t.Resources = append(t.Resources, mesosutil.NewRangesResource("ports", []*mesosproto.Value_Range{mesosutil.NewValueRange(hostPort, hostPort)})) - } - } - // TODO handle -P here - t.Container.Docker.Network = mesosproto.ContainerInfo_DockerInfo_BRIDGE.Enum() - default: - log.Errorf("Unsupported network mode %q", t.config.HostConfig.NetworkMode) - t.Container.Docker.Network = mesosproto.ContainerInfo_DockerInfo_BRIDGE.Enum() - } - - if cpus := t.config.HostConfig.CPUShares; cpus > 0 { - t.Resources = append(t.Resources, mesosutil.NewScalarResource("cpus", float64(cpus))) - } - - if mem := t.config.HostConfig.Memory; mem > 0 { - t.Resources = append(t.Resources, mesosutil.NewScalarResource("mem", float64(mem/1024/1024))) - } - - if len(t.config.Cmd) > 0 && t.config.Cmd[0] != "" { - t.Command.Value = &t.config.Cmd[0] - } - - if len(t.config.Cmd) > 1 { - t.Command.Arguments = t.config.Cmd[1:] - } - - for key, value := range t.config.Labels { - t.Container.Docker.Parameters = append(t.Container.Docker.Parameters, &mesosproto.Parameter{Key: proto.String("label"), Value: proto.String(fmt.Sprintf("%s=%s", key, value))}) - } - - for _, value := range t.config.Env { - t.Container.Docker.Parameters = append(t.Container.Docker.Parameters, &mesosproto.Parameter{Key: proto.String("env"), Value: proto.String(value)}) - } - - if !t.config.AttachStdin && !t.config.AttachStdout && !t.config.AttachStderr { - t.Container.Docker.Parameters = append(t.Container.Docker.Parameters, &mesosproto.Parameter{Key: proto.String("label"), Value: proto.String(fmt.Sprintf("%s=true", cluster.SwarmLabelNamespace+".mesos.detach"))}) - } - - t.SlaveId = &mesosproto.SlaveID{Value: &slaveID} -} - -// NewTask function creates a task -func NewTask(config *cluster.ContainerConfig, name string, timeout time.Duration) (*Task, error) { - id := stringid.TruncateID(stringid.GenerateRandomID()) - - if name != "" { - id = name + "." + id - } - // save the name in labels as the mesos containerizer will override it - config.Labels[cluster.SwarmLabelNamespace+".mesos.name"] = name - // FIXME: once Mesos changes merged no need to save the task id to know which container we launched - config.Labels[cluster.SwarmLabelNamespace+".mesos.task"] = id - - task := &Task{ - config: config, - container: make(chan *cluster.Container), - Error: make(chan error), - updates: make(chan *mesosproto.TaskStatus), - } - - task.Name = &name - task.TaskId = &mesosproto.TaskID{Value: &id} - task.Labels = &mesosproto.Labels{Labels: []*mesosproto.Label{{Key: proto.String("SWARM_CONTAINER_NAME"), Value: &name}}} - - go task.suicide(timeout) - - return task, nil -} - -func (t *Task) suicide(timeout time.Duration) { - <-time.After(timeout) - if !t.Stopped() && t.SlaveId == nil { - t.Error <- fmt.Errorf("container failed to start after %s", timeout) - } -} - -// SendStatus method writes the task status in the updates channel -func (t *Task) SendStatus(status *mesosproto.TaskStatus) { - t.updates <- status -} - -// GetStatus method reads the task status on the updates channel -func (t *Task) GetStatus() *mesosproto.TaskStatus { - return <-t.updates -} - -// Monitor method monitors task statuses -func (t *Task) Monitor() (bool, []byte, error) { - taskStatus := t.GetStatus() - - switch taskStatus.GetState() { - case mesosproto.TaskState_TASK_STAGING: - case mesosproto.TaskState_TASK_STARTING: - case mesosproto.TaskState_TASK_RUNNING: - case mesosproto.TaskState_TASK_FINISHED: - return true, taskStatus.Data, nil - case mesosproto.TaskState_TASK_FAILED: - errorMessage := taskStatus.GetMessage() - if strings.Contains(errorMessage, "Abnormal executor termination") { - errorMessage += " : please verify your SWARM_MESOS_USER is correctly set" - } - return true, nil, errors.New(errorMessage) - case mesosproto.TaskState_TASK_KILLED: - return true, taskStatus.Data, nil - case mesosproto.TaskState_TASK_LOST: - return true, nil, errors.New(taskStatus.GetMessage()) - case mesosproto.TaskState_TASK_ERROR: - return true, nil, errors.New(taskStatus.GetMessage()) - } - - return false, taskStatus.Data, nil -} diff --git a/cluster/mesos/task/task_test.go b/cluster/mesos/task/task_test.go deleted file mode 100644 index 78919537fd..0000000000 --- a/cluster/mesos/task/task_test.go +++ /dev/null @@ -1,79 +0,0 @@ -package task - -import ( - "sort" - "strings" - "testing" - "time" - - containertypes "github.com/docker/docker/api/types/container" - networktypes "github.com/docker/docker/api/types/network" - "github.com/docker/swarm/cluster" - "github.com/mesos/mesos-go/mesosproto" - "github.com/mesos/mesos-go/mesosutil" - "github.com/stretchr/testify/assert" -) - -const name = "mesos-swarm-task-name" - -var ( - containerConfig = containertypes.Config{ - Image: "test-image", - Cmd: []string{"ls", "foo", "bar"}, - } - hostConfig = containertypes.HostConfig{ - Resources: containertypes.Resources{ - CPUShares: 42, - Memory: 2097152, - }, - } - networkingConfig = networktypes.NetworkingConfig{} -) - -func TestBuild(t *testing.T) { - task, err := NewTask(cluster.BuildContainerConfig(containerConfig, hostConfig, networkingConfig), name, 5*time.Second) - assert.NoError(t, err) - - task.Build("slave-id", nil) - - assert.Equal(t, task.Container.GetType(), mesosproto.ContainerInfo_DOCKER) - assert.Equal(t, task.Container.Docker.GetImage(), "test-image") - assert.Equal(t, task.Container.Docker.GetNetwork(), mesosproto.ContainerInfo_DockerInfo_BRIDGE) - - assert.Equal(t, len(task.Resources), 2) - assert.Equal(t, task.Resources[0], mesosutil.NewScalarResource("cpus", 42.0)) - assert.Equal(t, task.Resources[1], mesosutil.NewScalarResource("mem", 2)) - - assert.Equal(t, task.Command.GetValue(), "ls") - assert.Equal(t, task.Command.GetArguments(), []string{"foo", "bar"}) - - parameters := []string{task.Container.Docker.GetParameters()[0].GetValue(), task.Container.Docker.GetParameters()[1].GetValue()} - sort.Strings(parameters) - - assert.Equal(t, len(parameters), 2) - assert.Equal(t, parameters[0], "com.docker.swarm.mesos.name="+name) - assert.Equal(t, parameters[1], "com.docker.swarm.mesos.task="+*task.TaskId.Value) - - assert.Equal(t, task.SlaveId.GetValue(), "slave-id") -} - -func TestNewTask(t *testing.T) { - task, err := NewTask(cluster.BuildContainerConfig(containertypes.Config{}, containertypes.HostConfig{}, networktypes.NetworkingConfig{}), name, 5*time.Second) - assert.NoError(t, err) - - assert.Equal(t, *task.Name, name) - assert.True(t, strings.HasPrefix(task.TaskId.GetValue(), name+".")) - assert.Equal(t, len(task.TaskId.GetValue()), len(name)+1+12) //+.+ -} - -func TestSendGetStatus(t *testing.T) { - task, err := NewTask(cluster.BuildContainerConfig(containertypes.Config{}, containertypes.HostConfig{}, networktypes.NetworkingConfig{}), "", 5*time.Second) - assert.NoError(t, err) - - status := mesosutil.NewTaskStatus(nil, mesosproto.TaskState_TASK_RUNNING) - - go func() { task.SendStatus(status) }() - s := task.GetStatus() - - assert.Equal(t, s, status) -} diff --git a/cluster/mesos/task/tasks.go b/cluster/mesos/task/tasks.go deleted file mode 100644 index a1f0c7c301..0000000000 --- a/cluster/mesos/task/tasks.go +++ /dev/null @@ -1,61 +0,0 @@ -package task - -import "sync" - -type launcher interface { - LaunchTask(t *Task) bool -} - -// Tasks is a simple map of tasks -type Tasks struct { - sync.Mutex - - cluster launcher - Tasks map[string]*Task -} - -// NewTasks returns a new tasks -func NewTasks(cluster launcher) *Tasks { - return &Tasks{ - Tasks: make(map[string]*Task), - cluster: cluster, - } - -} - -// Add tries to Do the Task, if it's not possible, add the Task to the tasks for future tries -func (t *Tasks) Add(task *Task) { - if !t.cluster.LaunchTask(task) { - t.Lock() - t.Tasks[task.ID()] = task - t.Unlock() - } -} - -// Remove a Task from the tasks -func (t *Tasks) Remove(tasks ...*Task) { - t.Lock() - t.remove(tasks...) - t.Unlock() -} - -// Process tries to Do all the Tasks in the tasks and remove the Tasks successfully done -func (t *Tasks) Process() { - t.Lock() - toRemove := []*Task{} - for _, task := range t.Tasks { - if t.cluster.LaunchTask(task) { - toRemove = append(toRemove, task) - } - } - - t.remove(toRemove...) - t.Unlock() -} - -func (t *Tasks) remove(tasks ...*Task) { - for _, task := range tasks { - task.Stop() - delete(t.Tasks, task.ID()) - } -} diff --git a/cluster/mesos/task/tasks_test.go b/cluster/mesos/task/tasks_test.go deleted file mode 100644 index fa41be2f52..0000000000 --- a/cluster/mesos/task/tasks_test.go +++ /dev/null @@ -1,56 +0,0 @@ -package task - -import ( - "testing" - "time" - - "github.com/docker/swarm/cluster" - "github.com/stretchr/testify/assert" -) - -type testLauncher struct { - count int -} - -func (t *testLauncher) LaunchTask(_ *Task) bool { - t.count = t.count - 1 - return t.count == 0 -} - -func TestAdd(t *testing.T) { - q := NewTasks(&testLauncher{count: 1}) - - task1, _ := NewTask(cluster.BuildContainerConfig(containerConfig, hostConfig, networkingConfig), "name1", 5*time.Second) - - task2, _ := NewTask(cluster.BuildContainerConfig(containerConfig, hostConfig, networkingConfig), "name2", 5*time.Second) - q.Add(task1) - assert.Equal(t, len(q.Tasks), 0) - - q.Add(task2) - assert.Equal(t, len(q.Tasks), 1) - -} - -func TestRemove(t *testing.T) { - q := NewTasks(&testLauncher{count: 2}) - task1, _ := NewTask(cluster.BuildContainerConfig(containerConfig, hostConfig, networkingConfig), "name1", 5*time.Second) - - q.Add(task1) - assert.Equal(t, len(q.Tasks), 1) - q.Remove(task1) - assert.Equal(t, len(q.Tasks), 0) - -} - -func TestProcess(t *testing.T) { - q := NewTasks(&testLauncher{count: 3}) - task1, _ := NewTask(cluster.BuildContainerConfig(containerConfig, hostConfig, networkingConfig), "name1", 5*time.Second) - - q.Add(task1) - assert.Equal(t, len(q.Tasks), 1) - q.Process() - assert.Equal(t, len(q.Tasks), 1) - q.Process() - assert.Equal(t, len(q.Tasks), 0) - -} diff --git a/cluster/mesos/task/utils.go b/cluster/mesos/task/utils.go deleted file mode 100644 index 1a8d4a79f4..0000000000 --- a/cluster/mesos/task/utils.go +++ /dev/null @@ -1,16 +0,0 @@ -package task - -import "github.com/mesos/mesos-go/mesosproto" - -func getPorts(offer *mesosproto.Offer) (ports []uint64) { - for _, resource := range offer.Resources { - if resource.GetName() == "ports" { - for _, rang := range resource.GetRanges().GetRange() { - for i := rang.GetBegin(); i <= rang.GetEnd(); i++ { - ports = append(ports, i) - } - } - } - } - return ports -} diff --git a/cluster/mesos/utils.go b/cluster/mesos/utils.go deleted file mode 100644 index 9055a3e980..0000000000 --- a/cluster/mesos/utils.go +++ /dev/null @@ -1,39 +0,0 @@ -package mesos - -import ( - "fmt" - "strings" - - "github.com/docker/go-units" - "github.com/mesos/mesos-go/mesosproto" -) - -func formatResource(resource *mesosproto.Resource) string { - switch resource.GetType() { - case mesosproto.Value_SCALAR: - if resource.GetName() == "disk" || resource.GetName() == "mem" { - return units.BytesSize(resource.GetScalar().GetValue() * 1024 * 1024) - } - return fmt.Sprintf("%d", int(resource.GetScalar().GetValue())) - - case mesosproto.Value_RANGES: - var ranges []string - for _, r := range resource.GetRanges().GetRange() { - ranges = append(ranges, fmt.Sprintf("%d-%d", r.GetBegin(), r.GetEnd())) - } - return strings.Join(ranges, ", ") - } - return "?" -} - -func sumScalarResourceValue(offers map[string]*mesosproto.Offer, name string) float64 { - var value float64 - for _, offer := range offers { - for _, resource := range offer.Resources { - if *resource.Name == name { - value += *resource.Scalar.Value - } - } - } - return value -}