Skip to content

Commit

Permalink
Implement server selection using consistent hash (distribworks#522)
Browse files Browse the repository at this point in the history
* Implement server selection using consistent hash

Server selection via consistent hash over peer members, ensures a good request distribution and mimics a cache store.

* fix: Do not reply events on join. This could lead to unpredictable job executions out of time
* refactor: To use GetExecutons in GetExecutionsGroup
  • Loading branch information
Victor Castell authored May 1, 2019
1 parent 6b41d55 commit e8b5d55
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 89 deletions.
32 changes: 30 additions & 2 deletions dkron/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ var (
runningExecutions sync.Map
)

// Agent is the main struct that represents a dkron agent
type Agent struct {
ProcessorPlugins map[string]ExecutionProcessor
ExecutorPlugins map[string]Executor
Expand All @@ -45,6 +46,9 @@ type Agent struct {
GRPCServer DkronGRPCServer
GRPCClient DkronGRPCClient

// Set a global peer updater func
PeerUpdaterFunc func(...string)

serf *serf.Serf
config *Config
eventCh chan serf.Event
Expand Down Expand Up @@ -79,7 +83,7 @@ func (a *Agent) Start() error {
return fmt.Errorf("agent: Can not setup serf, %s", err)
}
a.serf = s
a.join(a.config.StartJoin, true)
a.join(a.config.StartJoin, false)

if err := initMetrics(a); err != nil {
log.Fatal("agent: Can not setup metrics")
Expand Down Expand Up @@ -396,7 +400,8 @@ func (a *Agent) leaderMember() (*serf.Member, error) {
return nil, ErrLeaderNotFound
}

func (a *Agent) listServers() []serf.Member {
// ListServers returns the list of server members
func (a *Agent) ListServers() []serf.Member {
members := []serf.Member{}

for _, member := range a.serf.Members() {
Expand All @@ -409,13 +414,31 @@ func (a *Agent) listServers() []serf.Member {
return members
}

// LocalMember return the local serf member
func (a *Agent) LocalMember() serf.Member {
return a.serf.LocalMember()
}

// GetBindIP returns the IP address that the agent is bound to.
// This could be different than the originally configured address.
func (a *Agent) GetBindIP() (string, error) {
bindIP, _, err := a.config.AddrParts(a.config.BindAddr)
return bindIP, err
}

// GetPeers returns a list of the current serf servers peers addresses
func (a *Agent) GetPeers() (peers []string) {
s := a.ListServers()
for _, m := range s {
if addr, ok := m.Tags["dkron_rpc_addr"]; ok {
peers = append(peers, addr)
log.WithField("peer", addr).Debug("agent: updated peer")
}
}

return
}

// Listens to events from Serf and handle the event.
func (a *Agent) eventLoop() {
serfShutdownCh := a.serf.ShutdownCh()
Expand All @@ -437,6 +460,11 @@ func (a *Agent) eventLoop() {
"event": e.EventType(),
}).Debug("agent: Member event")
}

//In case of member event update peer list
if a.PeerUpdaterFunc != nil {
a.PeerUpdaterFunc(a.GetPeers()...)
}
}

if e.EventType() == serf.EventQuery {
Expand Down
2 changes: 1 addition & 1 deletion dkron/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func (h *HTTPTransport) leaderHandler(c *gin.Context) {

func (h *HTTPTransport) leaveHandler(c *gin.Context) {
if err := h.agent.Stop(); err != nil {
renderJSON(c, http.StatusOK, h.agent.listServers())
renderJSON(c, http.StatusOK, h.agent.ListServers())
}
}

Expand Down
31 changes: 10 additions & 21 deletions dkron/invoke.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@ package dkron

import (
"errors"
"math/rand"
"time"

"github.com/armon/circbuf"
"github.com/hashicorp/serf/serf"
"github.com/golang/groupcache/consistenthash"
)

const (
Expand Down Expand Up @@ -61,33 +60,23 @@ func (a *Agent) invokeJob(job *Job, execution *Execution) error {
execution.Success = success
execution.Output = output.Bytes()

rpcServer, err := a.getServerRPCAddresFromTags()
rpcServer, err := a.selectServerByKey(execution.Key())
if err != nil {
return err
}
log.WithField("server", rpcServer).Debug("invoke: Selected a server to send result")

runningExecutions.Delete(execution.GetGroup())

return a.GRPCClient.CallExecutionDone(rpcServer, execution)
}

func (a *Agent) selectServer() serf.Member {
var server serf.Member
// Select a server based on key using a consistent hash key
// like a cache store.
func (a *Agent) selectServerByKey(key string) (string, error) {
ch := consistenthash.New(50, nil)
ch.Add(a.GetPeers()...)
peerAddress := ch.Get(key)

servers := a.listServers()

if len(servers) > 0 {
server = servers[rand.Intn(len(servers))]
}

return server
}

func (a *Agent) getServerRPCAddresFromTags() (string, error) {
s := a.selectServer()

if addr, ok := s.Tags["dkron_rpc_addr"]; ok {
return addr, nil
}
return "", ErrNoRPCAddress
return peerAddress, nil
}
20 changes: 8 additions & 12 deletions dkron/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,22 +385,17 @@ func (s *Store) GetLastExecutionGroup(jobName string) ([]*Execution, error) {
return executions, nil
}

// GetExecutionGroup returns all executions in the same group of a given execution
func (s *Store) GetExecutionGroup(execution *Execution) ([]*Execution, error) {
res, err := s.client.List(fmt.Sprintf("%s/executions/%s", s.keyspace, execution.JobName), nil)
res, err := s.GetExecutions(execution.JobName)
if err != nil {
return nil, err
}

var executions []*Execution
for _, node := range res {
var ex Execution
err := json.Unmarshal([]byte(node.Value), &ex)
if err != nil {
return nil, err
}

for _, ex := range res {
if ex.Group == execution.Group {
executions = append(executions, &ex)
executions = append(executions, ex)
}
}
return executions, nil
Expand Down Expand Up @@ -428,17 +423,17 @@ func (s *Store) GetGroupedExecutions(jobName string) (map[int64][]*Execution, []
return groups, byGroup, nil
}

// Save a new execution and returns the key of the new saved item or an error.
// SetExecution Save a new execution and returns the key of the new saved item or an error.
func (s *Store) SetExecution(execution *Execution) (string, error) {
exJson, _ := json.Marshal(execution)
exJSON, _ := json.Marshal(execution)
key := execution.Key()

log.WithFields(logrus.Fields{
"job": execution.JobName,
"execution": key,
}).Debug("store: Setting key")

err := s.client.Put(fmt.Sprintf("%s/executions/%s/%s", s.keyspace, execution.JobName, key), exJson, nil)
err := s.client.Put(fmt.Sprintf("%s/executions/%s/%s", s.keyspace, execution.JobName, key), exJSON, nil)
if err != nil {
log.WithFields(logrus.Fields{
"job": execution.JobName,
Expand All @@ -458,6 +453,7 @@ func (s *Store) SetExecution(execution *Execution) (string, error) {
// Delete all execution results over the limit, starting from olders
if len(execs) > MaxExecutions {
//sort the array of all execution groups by StartedAt time
// TODO: Use sort.Slice
sort.Sort(ExecList(execs))
for i := 0; i < len(execs)-MaxExecutions; i++ {
log.WithFields(logrus.Fields{
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ require (
github.com/gin-contrib/multitemplate v0.0.0-20170922032617-bbc6daf6024b
github.com/gin-contrib/sse v0.0.0-20170109093832-22d885f9ecc7 // indirect
github.com/gin-gonic/gin v1.3.0
github.com/golang/groupcache v0.0.0-20181024230925-c65c006176ff // indirect
github.com/golang/groupcache v0.0.0-20181024230925-c65c006176ff
github.com/golang/protobuf v1.2.0
github.com/gorilla/websocket v1.4.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 // indirect
Expand Down
141 changes: 91 additions & 50 deletions proto/dkron.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit e8b5d55

Please sign in to comment.