Skip to content

Commit

Permalink
Leave cluster RPC command (distribworks#496)
Browse files Browse the repository at this point in the history
Leave cluster RPC command

Implement the command to leave a cluster using an agent command and calling the leave method by RPC to the local or remote agent.
  • Loading branch information
Victor Castell authored Feb 21, 2019
1 parent caaf783 commit d7b1eaa
Show file tree
Hide file tree
Showing 14 changed files with 262 additions and 97 deletions.
2 changes: 1 addition & 1 deletion cmd/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ WAIT:
log.Info("agent: Gracefully shutting down agent...")
go func() {
plugin.CleanupClients()
if err := agent.Leave(); err != nil {
if err := agent.Stop(); err != nil {
fmt.Printf("Error: %s", err)
log.Error(fmt.Sprintf("Error: %s", err))
return
Expand Down
35 changes: 35 additions & 0 deletions cmd/leave.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package cmd

import (
"github.com/spf13/cobra"
"github.com/victorcoder/dkron/dkron"
)

var rpcAddr string

// versionCmd represents the version command
var leaveCmd = &cobra.Command{
Use: "leave",
Short: "Force an agent to leave the cluster",
Long: `Stop stops an agent, if the agent is a server and is running for election
stop running for election, if this server was the leader
this will force the cluster to elect a new leader and start a new scheduler.
If this is a server and has the scheduler started stop it, ignoring if this server
was participating in leader election or not (local storage).
Then actually leave the cluster.`,
RunE: func(cmd *cobra.Command, args []string) error {
var gc dkron.DkronGRPCClient
gc = dkron.NewGRPCClient(nil)

if err := gc.Leave(rpcAddr); err != nil {
return err
}

return nil
},
}

func init() {
dkronCmd.AddCommand(leaveCmd)
leaveCmd.PersistentFlags().StringVar(&rpcAddr, "rpc-addr", "127.0.0.1:6868", "gRPC address of the agent")
}
18 changes: 13 additions & 5 deletions dkron/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,23 @@ func (a *Agent) Start() error {
return nil
}

// Stop stops an agent, if the agent is a server and is running for election
// stop running for election, if this server was the leader
// this will force the cluster to elect a new leader and start a new scheduler.
// If this is a server and has the scheduler started stop it, ignoring if this server
// was participating in leader election or not (local storage).
// Then actually leave the cluster.
func (a *Agent) Stop() error {
if a.config.Server {
log.Info("agent: Called member stop, now stopping")

if a.config.Server && a.candidate != nil {
a.candidate.Stop()
}

if a.config.Server && a.sched.Started {
a.sched.Stop()
}

if err := a.serf.Leave(); err != nil {
return err
}
Expand Down Expand Up @@ -595,10 +607,6 @@ func (a *Agent) getRPCAddr() string {
return fmt.Sprintf("%s:%d", bindIP, a.config.AdvertiseRPCPort)
}

func (a *Agent) Leave() error {
return a.serf.Leave()
}

func (a *Agent) SetTags(tags map[string]string) error {
if a.config.Server {
tags["dkron_rpc_addr"] = a.getRPCAddr()
Expand Down
2 changes: 1 addition & 1 deletion dkron/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func (h *HTTPTransport) leaderHandler(c *gin.Context) {
}

func (h *HTTPTransport) leaveHandler(c *gin.Context) {
if err := h.agent.serf.Leave(); err != nil {
if err := h.agent.Stop(); err != nil {
renderJSON(c, http.StatusOK, h.agent.listServers())
}
}
Expand Down
44 changes: 41 additions & 3 deletions dkron/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/abronan/valkeyrie/store"
metrics "github.com/armon/go-metrics"
"github.com/golang/protobuf/ptypes/empty"
"github.com/sirupsen/logrus"
"github.com/victorcoder/dkron/proto"
"golang.org/x/net/context"
Expand Down Expand Up @@ -189,26 +190,36 @@ retry:
return execDoneResp, nil
}

func (grpcs *GRPCServer) Leave(ctx context.Context, in *empty.Empty) (*empty.Empty, error) {
return in, grpcs.agent.Stop()
}

type DkronGRPCClient interface {
Connect(string) (*grpc.ClientConn, error)
CallExecutionDone(string, *Execution) error
CallGetJob(string, string) (*Job, error)
Leave(string) error
}

type GRPCClient struct {
dialOpt grpc.DialOption
dialOpt []grpc.DialOption
}

func NewGRPCClient(dialOpt grpc.DialOption) DkronGRPCClient {
if dialOpt == nil {
dialOpt = grpc.WithInsecure()
}
return &GRPCClient{dialOpt: dialOpt}
return &GRPCClient{dialOpt: []grpc.DialOption{
dialOpt,
grpc.WithBlock(),
grpc.WithTimeout(5 * time.Second),
},
}
}

func (grpcc *GRPCClient) Connect(addr string) (*grpc.ClientConn, error) {
// Initiate a connection with the server
conn, err := grpc.Dial(addr, grpcc.dialOpt)
conn, err := grpc.Dial(addr, grpcc.dialOpt...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -267,3 +278,30 @@ func (grpcc *GRPCClient) CallGetJob(addr, jobName string) (*Job, error) {

return NewJobFromProto(gjr), nil
}

func (grpcc *GRPCClient) Leave(addr string) error {
var conn *grpc.ClientConn

// Initiate a connection with the server
conn, err := grpcc.Connect(addr)
if err != nil {
log.WithFields(logrus.Fields{
"err": err,
"server_addr": addr,
}).Error("grpc: error dialing.")
return err
}
defer conn.Close()

// Synchronous call
d := proto.NewDkronClient(conn)
_, err = d.Leave(context.Background(), &empty.Empty{})
if err != nil {
log.WithFields(logrus.Fields{
"error": err,
}).Warning("grpc: Error calling Leave")
return err
}

return nil
}
7 changes: 6 additions & 1 deletion dkron/invoke.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,13 @@ func (a *Agent) invokeJob(job *Job, execution *Execution) error {
}

func (a *Agent) selectServer() serf.Member {
var server serf.Member

servers := a.listServers()
server := servers[rand.Intn(len(servers))]

if len(servers) > 0 {
server = servers[rand.Intn(len(servers))]
}

return server
}
Expand Down
133 changes: 84 additions & 49 deletions proto/dkron.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit d7b1eaa

Please sign in to comment.