Skip to content

Commit

Permalink
Leader election eventing
Browse files Browse the repository at this point in the history
gRPC connection managers pick first address that works

Leadership change event handlers bugfixing

Control plane addresses for data planes and worker nodes

Registration server termination signals

Signed-off-by: Lazar Cvetković <[email protected]>
  • Loading branch information
cvetkovic committed Feb 21, 2024
1 parent 51a7324 commit e114edb
Show file tree
Hide file tree
Showing 19 changed files with 198 additions and 116 deletions.
11 changes: 7 additions & 4 deletions api/control_plane_api_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type CpApiServerCreationArguments struct {
Cfg *config2.ControlPlaneConfig
}

func CreateNewCpApiServer(args *CpApiServerCreationArguments) (*CpApiServer, chan bool) {
func CreateNewCpApiServer(args *CpApiServerCreationArguments) (*CpApiServer, chan leader_election.AnnounceLeadership) {
cp := control_plane.NewControlPlane(
args.Client,
args.OutputFile,
Expand Down Expand Up @@ -67,7 +67,7 @@ func CreateNewCpApiServer(args *CpApiServerCreationArguments) (*CpApiServer, cha

// connecting to peers for leader election
for _, rawAddress := range args.Cfg.Replicas {
peerID := grpc_helpers.GetPeerPort(rawAddress)
_, peerID := grpc_helpers.SplitAddress(rawAddress)
tcpAddr, _ := net.ResolveTCPAddr("tcp", rawAddress)

leaderElectionServer.ConnectToPeer(peerID, tcpAddr)
Expand All @@ -87,16 +87,19 @@ func (c *CpApiServer) CleanControlPlaneInMemoryData(args *CpApiServerCreationArg
)
}

func (c *CpApiServer) StartNodeMonitoringLoop(stopCh chan struct{}) {
func (c *CpApiServer) StartNodeMonitoringLoop() chan struct{} {
if !c.LeaderElectionServer.IsLeader() {
logrus.Errorf("Cannot start node monitoring loop as this " +
"instance of control plane is currently not the leader. " +
"Probably lost leadership in the meanwhile.")

return
return nil
}

stopCh := make(chan struct{})
go c.ControlPlane.CheckPeriodicallyWorkerNodes(stopCh)

return stopCh
}

func (c *CpApiServer) OnMetricsReceive(ctx context.Context, metric *proto.AutoscalingMetric) (*proto.ActionStatus, error) {
Expand Down
3 changes: 1 addition & 2 deletions cmd/data_plane/config.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
dataPlaneIp: "127.0.0.1"
controlPlaneIp: "localhost"
controlPlanePort: "9090"
controlPlaneAddress: "127.0.0.1:9090,127.0.0.1:10000,127.0.0.1:10001"
portProxy: "8080"
portGRPC: "8081"
portProxyRead: "8082"
Expand Down
3 changes: 1 addition & 2 deletions cmd/data_plane/config_async.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
dataPlaneIp: "127.0.0.1"
controlPlaneIp: "localhost"
controlPlanePort: "9090"
controlPlaneAddress: "localhost:9090"
portProxy: "8080"
portGRPC: "8081"
portProxyRead: "8082"
Expand Down
3 changes: 1 addition & 2 deletions cmd/data_plane/config_cluster.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
dataPlaneIp: "dynamic"
controlPlaneIp: "10.0.1.2"
controlPlanePort: "9090"
controlPlaneAddress: "10.0.1.2:9090"
portProxy: "8080"
portProxyRead: "8082"
portGRPC: "8081"
Expand Down
1 change: 1 addition & 0 deletions cmd/master_node/config_cluster.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
port: "9090"
replicas: ""
portRegistration: "9091"
verbosity: "debug"
traceOutputFolder: "data"
Expand Down
42 changes: 22 additions & 20 deletions cmd/master_node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"cluster_manager/pkg/profiler"
"context"
"flag"
"net/http"
"os/signal"
"path"
"syscall"
Expand All @@ -41,7 +40,6 @@ func main() {
logger.SetupLogger(cfg.Verbosity)

var persistenceLayer persistence.PersistenceLayer

if cfg.Persistence {
persistenceLayer, err = persistence.CreateRedisClient(context.Background(), cfg.RedisConf)
if err != nil {
Expand Down Expand Up @@ -91,21 +89,32 @@ func main() {
/////////////////////////////////////////
// LEADERSHIP SPECIFIC
/////////////////////////////////////////
var registrationServer *http.Server
var stopNodeMonitoring chan struct{}
var stopRegistrationServer chan struct{}

wasLeaderBefore := false

for {
select {
case leader := <-isLeader:
if leader {
logrus.Infof("Proceeding as the leader for the current term...")
case leadership := <-isLeader:
if leadership.IsLeader && !wasLeaderBefore {
destroyStateFromPreviousElectionTerm(cpApiServer, cpApiCreationArgs, stopNodeMonitoring, stopRegistrationServer)
stopNodeMonitoring, stopRegistrationServer = nil, nil

destroyStateFromPreviousElectionTerm(cpApiServer, cpApiCreationArgs, registrationServer, stopNodeMonitoring)
ReconstructControlPlaneState(&cfg, cpApiServer)

cpApiServer.StartNodeMonitoringLoop(stopNodeMonitoring)
registrationServer = registration_server.StartServiceRegistrationServer(cpApiServer, cfg.PortRegistration)
stopNodeMonitoring = cpApiServer.StartNodeMonitoringLoop()
_, stopRegistrationServer = registration_server.StartServiceRegistrationServer(cpApiServer, cfg.PortRegistration, leadership.Term)

wasLeaderBefore = true
logrus.Infof("Proceeding as the leader for the term #%d...", leadership.Term)
} else {
if wasLeaderBefore {
destroyStateFromPreviousElectionTerm(cpApiServer, cpApiCreationArgs, stopNodeMonitoring, stopRegistrationServer)
stopNodeMonitoring, stopRegistrationServer = nil, nil
}
wasLeaderBefore = false

logrus.Infof("Another node was elected as the leader. Proceeding as a follower...")
}
case <-ctx.Done():
Expand All @@ -118,21 +127,14 @@ func main() {
/////////////////////////////////////////
}

func destroyStateFromPreviousElectionTerm(cpApiServer *api.CpApiServer, args *api.CpApiServerCreationArguments, registrationServer *http.Server, stopNodeMonitoring chan struct{}) {
if registrationServer != nil {
logrus.Infof("Shutting down function registration server from the previous leader's term.")
func destroyStateFromPreviousElectionTerm(cpApiServer *api.CpApiServer, args *api.CpApiServerCreationArguments,
stopNodeMonitoring chan struct{}, stopRegistrationServer chan struct{}) {

ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()

if err := registrationServer.Shutdown(ctx); err != nil {
logrus.Errorf("Failed to shut down function registration server.")
}
if stopRegistrationServer != nil {
stopRegistrationServer <- struct{}{}
}

if stopNodeMonitoring != nil {
logrus.Infof("Stopping node monitoring from the previous leader's term.")

stopNodeMonitoring <- struct{}{}
}

Expand Down
11 changes: 8 additions & 3 deletions cmd/simulator_workers/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"flag"
"github.com/sirupsen/logrus"
"math/rand"
"net"
"os/signal"
"strconv"
"sync"
Expand All @@ -31,8 +32,7 @@ func main() {
{
cfg := config.DataPlaneConfig{
DataPlaneIp: "127.0.0.1",
ControlPlaneIp: "localhost",
ControlPlanePort: "9090",
ControlPlaneAddress: []string{"localhost:9090"},
PortProxy: "8080",
PortGRPC: "8081",
Verbosity: "trace",
Expand Down Expand Up @@ -85,7 +85,12 @@ func main() {

logger.SetupLogger(cfg.Verbosity)

cpApi, err := grpc_helpers.InitializeControlPlaneConnection(cfg.ControlPlaneIp, cfg.ControlPlanePort, "", -1, -1)
cpApi, err := grpc_helpers.InitializeControlPlaneConnection(
[]string{net.JoinHostPort(cfg.ControlPlaneIp, cfg.ControlPlanePort)},
"",
-1,
-1,
)
if err != nil {
logrus.Fatalf("Failed to initialize control plane connection (error : %s)", err.Error())
}
Expand Down
3 changes: 1 addition & 2 deletions cmd/worker_node/config.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
workerNodeIp: "127.0.0.1"
controlPlaneIp: "localhost"
controlPlanePort: "9090"
controlPlaneAddress: "127.0.0.1:9090,127.0.0.1:10000,127.0.0.1:10001"
port: "10010"
verbosity: "trace"
criType: "containerd"
Expand Down
3 changes: 1 addition & 2 deletions cmd/worker_node/config_cluster.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
workerNodeIp: "dynamic"
controlPlaneIp: "10.0.1.2"
controlPlanePort: "9090"
controlPlaneAddress: "127.0.0.1:9090,127.0.0.1:10000,127.0.0.1:10001"
port: "10010"
verbosity: "trace"
criType: "containerd"
Expand Down
3 changes: 1 addition & 2 deletions cmd/worker_node/config_cluster_containerd.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
workerNodeIp: "dynamic"
controlPlaneIp: "10.0.1.2"
controlPlanePort: "9090"
controlPlaneAddress: "127.0.0.1:9090,127.0.0.1:10000,127.0.0.1:10001"
port: "10010"
verbosity: "trace"
criType: "containerd"
Expand Down
3 changes: 1 addition & 2 deletions cmd/worker_node/config_cluster_fake_worker.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
workerNodeIp: "dynamic"
controlPlaneIp: "10.0.1.2"
controlPlanePort: "9090"
controlPlaneAddress: "127.0.0.1:9090,127.0.0.1:10000,127.0.0.1:10001"
port: "$DAEMON_PORT"
verbosity: "trace"
criType: "scalability_test"
Expand Down
8 changes: 6 additions & 2 deletions cmd/worker_node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,12 @@ func main() {
cfg.WorkerNodeIP = network.GetLocalIP()
}

cpApi, err := grpc_helpers.InitializeControlPlaneConnection(cfg.ControlPlaneIp, cfg.ControlPlanePort, "", -1, -1)
cpApi, err := grpc_helpers.InitializeControlPlaneConnection(
cfg.ControlPlaneAddress,
"",
-1,
-1,
)
if err != nil {
logrus.Fatalf("Failed to initialize control plane connection (error : %s)", err.Error())
}
Expand Down Expand Up @@ -69,7 +74,6 @@ func main() {
firecracker.DeleteAllSnapshots()
err = firecracker.DeleteUnusedNetworkDevices()
if err != nil {

logrus.Warn("Interruption received, but failed to delete leftover network devices.")
}
}
Expand Down
5 changes: 4 additions & 1 deletion internal/control_plane/control_plane.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,10 @@ func (c *ControlPlane) CheckPeriodicallyWorkerNodes(stopCh chan struct{}) {
// the following call requires a lock on NIStorage
c.HandleFailure(events)
case <-stopCh:
break
logrus.Infof("Stopping node monitoring from the previous leader's term.")
close(stopCh)

return
}
}
}
Expand Down
29 changes: 19 additions & 10 deletions internal/control_plane/leader_election/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,13 @@ type ConsensusModule struct {

// Dirigent-specific
currentLeaderID int32
announceLeadership chan bool
announceLeadership chan AnnounceLeadership
}

// NewConsensusModule creates a new CM with the given ID, list of peer IDs and
// server. The ready channel signals the CM that all peers are connected and
// it's safe to start its state machine.
func NewConsensusModule(id int32, peerIds []int, server *LeaderElectionServer, ready <-chan interface{}, announceLeadership chan bool) *ConsensusModule {
func NewConsensusModule(id int32, peerIds []int, server *LeaderElectionServer, ready <-chan interface{}, announceLeadership chan AnnounceLeadership) *ConsensusModule {
cm := new(ConsensusModule)
cm.id = id
cm.peerIds = peerIds
Expand All @@ -88,7 +88,10 @@ func NewConsensusModule(id int32, peerIds []int, server *LeaderElectionServer, r
cm.announceLeadership = announceLeadership

if len(cm.peerIds) == 0 {
cm.announceLeadership <- true
cm.announceLeadership <- AnnounceLeadership{
IsLeader: true,
Term: int(cm.currentTerm),
}
cm.state = Leader
logrus.Infof("No peers found for the leader election. Proclaiming myself as the supreme leader...")
}
Expand Down Expand Up @@ -143,7 +146,7 @@ func (cm *ConsensusModule) RequestVote(args *proto.RequestVoteArgs) (*proto.Requ
logrus.Tracef("RequestVote: %+v [currentTerm=%d, votedFor=%d]", args, cm.currentTerm, cm.votedFor)

if args.Term > cm.currentTerm {
logrus.Debug("... term out of date in RequestVote")
logrus.Trace("... term out of date in RequestVote")
cm.becomeFollower(args.Term)
}

Expand Down Expand Up @@ -279,7 +282,7 @@ func (cm *ConsensusModule) startElection() {
savedCurrentTerm := cm.currentTerm
cm.electionResetEvent = time.Now()
cm.votedFor = cm.id
logrus.Debugf("becomes Candidate (currentTerm=%d); log=%v", savedCurrentTerm, cm.log)
logrus.Debugf("Becomes Candidate for term #%d", savedCurrentTerm)

votesReceived := 1

Expand Down Expand Up @@ -318,7 +321,7 @@ func (cm *ConsensusModule) startElection() {
votesReceived += 1
if votesReceived*2 > len(cm.peerIds)+1 {
// Won the election!
logrus.Debugf("wins election with %d votes", votesReceived)
logrus.Debugf("Wins election for term #%d with %d votes", savedCurrentTerm, votesReceived)
cm.startLeader()
return
}
Expand All @@ -335,11 +338,14 @@ func (cm *ConsensusModule) startElection() {
// becomeFollower makes cm a follower and resets its state.
// Expects cm.mu to be locked.
func (cm *ConsensusModule) becomeFollower(term int32) {
logrus.Debugf("becomes Follower with term=%d; log=%v", term, cm.log)
logrus.Debugf("Becomes Follower for term #%d", term)

if cm.state == Leader {
// loosing leadership
cm.announceLeadership <- false
cm.announceLeadership <- AnnounceLeadership{
IsLeader: false,
Term: int(cm.currentTerm),
}
}

cm.state = Follower
Expand All @@ -354,8 +360,11 @@ func (cm *ConsensusModule) becomeFollower(term int32) {
// Expects cm.mu to be locked.
func (cm *ConsensusModule) startLeader() {
cm.state = Leader
cm.announceLeadership <- true
logrus.Debugf("becomes Leader; term=%d, log=%v", cm.currentTerm, cm.log)
cm.announceLeadership <- AnnounceLeadership{
IsLeader: true,
Term: int(cm.currentTerm),
}
logrus.Debugf("Becomes Leader for term #%d", cm.currentTerm)

go func() {
ticker := time.NewTicker(50 * time.Millisecond)
Expand Down
14 changes: 9 additions & 5 deletions internal/control_plane/leader_election/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ import (
"time"
)

type AnnounceLeadership struct {
IsLeader bool
Term int
}

// LeaderElectionServer wraps a raft.ConsensusModule along with a rpc.Server that exposes its
// methods as RPC endpoints. It also manages the peers of the Raft server. The
// main goal of this type is to simplify the code of raft.LeaderElectionServer for
Expand All @@ -39,17 +44,17 @@ type LeaderElectionServer struct {
quit chan interface{}
wg sync.WaitGroup

announceLeadership chan bool
announceLeadership chan AnnounceLeadership
}

func NewServer(serverId int32, peerIds []int, ready <-chan interface{}) (*LeaderElectionServer, chan bool) {
func NewServer(serverId int32, peerIds []int, ready <-chan interface{}) (*LeaderElectionServer, chan AnnounceLeadership) {
s := new(LeaderElectionServer)
s.serverId = serverId
s.peerIds = peerIds
s.peerClients = make(map[int]proto.CpiInterfaceClient)
s.ready = ready
s.quit = make(chan interface{})
s.announceLeadership = make(chan bool, 1)
s.announceLeadership = make(chan AnnounceLeadership, 1)

///////////////
s.mu.Lock()
Expand Down Expand Up @@ -113,8 +118,7 @@ func (s *LeaderElectionServer) ConnectToPeer(peerId int, addr net.Addr) {
defer s.mu.Unlock()

if s.peerClients[peerId] == nil {
address, port, _ := net.SplitHostPort(addr.String())
conn := grpc_helpers.EstablishGRPCConnectionPoll(address, port)
conn := grpc_helpers.EstablishGRPCConnectionPoll([]string{addr.String()})
s.peerClients[peerId] = proto.NewCpiInterfaceClient(conn)
}
}
Expand Down
Loading

0 comments on commit e114edb

Please sign in to comment.