Skip to content

Commit

Permalink
support multiple agents
Browse files Browse the repository at this point in the history
  • Loading branch information
chrislusf committed Oct 10, 2015
1 parent 6030360 commit 72c850a
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 10 deletions.
2 changes: 2 additions & 0 deletions agent/agent_server_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ func (as *AgentServer) handleStart(conn net.Conn,

cmd.Wait()

// log.Printf("Finish command %v", cmd)

return reply
}

Expand Down
3 changes: 3 additions & 0 deletions agent/agent_server_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
)

func (as *AgentServer) handleWriteConnection(r io.Reader, name string) {

as.name2StoreLock.Lock()
ds, ok := as.name2Store[name]
if !ok {
Expand Down Expand Up @@ -48,9 +49,11 @@ func (as *AgentServer) handleWriteConnection(r io.Reader, name string) {
}

func (as *AgentServer) handleDelete(name string) {

as.name2StoreLock.Lock()
ds, ok := as.name2Store[name]
if !ok {
as.name2StoreLock.Unlock()
return
}

Expand Down
4 changes: 2 additions & 2 deletions driver/network_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ type NetworkContext struct {
var networkContext NetworkContext

func init() {
flag.IntVar(&networkContext.AgentPort, "task.agent.port", 8931, "agent port")
flag.StringVar(&networkContext.LeaderAddress, "task.leader.address", "localhost:8930", "leader address, as host:port")
flag.IntVar(&networkContext.AgentPort, "glow.agent.port", 8931, "agent port")
flag.StringVar(&networkContext.LeaderAddress, "glow.leader.address", "localhost:8930", "leader address, as host:port")
}

func GetSendChannel(name string, wg *sync.WaitGroup) (chan []byte, error) {
Expand Down
12 changes: 6 additions & 6 deletions driver/scheduler/market/cda_market.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,25 +76,25 @@ func (m *Market) AddDemand(r Requirement, bid int, retChan chan Supply) {
}

func (m *Market) FetcherLoop() {
m.Lock.Lock()
defer m.Lock.Unlock()

for {
m.Lock.Lock()
for len(m.Demands) == 0 {
m.hasDemands.Wait()
}
m.Lock.Unlock()

m.FetchFn(m.Demands)
}
}

func (m *Market) ReturnSupply(s Supply) {
m.Lock.Lock()
defer m.Lock.Unlock()

m.AddSupply(s)
}

func (m *Market) AddSupply(supply Supply) {
m.Lock.Lock()
defer m.Lock.Unlock()

if len(m.Demands) > 0 {
demand := m.pickBestDemandFor(supply)
demand.ReturnChan <- supply
Expand Down
14 changes: 12 additions & 2 deletions driver/scheduler/scheduler_events.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package scheduler

import (
// "fmt"
"log"
"os"
"strconv"
"sync"
Expand Down Expand Up @@ -55,6 +57,8 @@ func (s *Scheduler) EventLoop() {
s.datasetShard2LocationLock.Unlock()
}

// fmt.Printf("allocated %s on %v\n", tasks[0].Name(), allocation.Location)

dir, _ := os.Getwd()
args := []string{
"-glow.context.id",
Expand All @@ -63,15 +67,20 @@ func (s *Scheduler) EventLoop() {
strconv.Itoa(taskGroup.Id),
"-glow.task.name",
tasks[0].Name(),
"-glow.agent.port",
strconv.Itoa(allocation.Location.Port),
"-glow.leader.address",
s.Leader,
}
for _, arg := range os.Args[1:] {
args = append(args, arg)
}
request := NewStartRequest(os.Args[0], dir, args, allocation.Allocated)
// fmt.Printf("starting on %s: %v\n", server, request)
// fmt.Printf("starting on %s: %v\n", allocation.Allocated, request)
if err := RemoteDirectExecute(allocation.Location.URL(), request); err != nil {
println("exeuction error:", err.Error())
log.Printf("exeuction error %v: %v", err, request)
} else {
// fmt.Printf("Closing and returning resources on %s: %v\n", allocation.Allocated, request)
s.Market.ReturnSupply(supply)
}
}()
Expand All @@ -90,6 +99,7 @@ func (s *Scheduler) EventLoop() {
}
}
}

}()
}
}
Expand Down

0 comments on commit 72c850a

Please sign in to comment.