Skip to content

Commit

Permalink
HAProxy redundancy on all nodes with registrations server replicas
Browse files Browse the repository at this point in the history
Signed-off-by: Lazar Cvetković <[email protected]>
  • Loading branch information
cvetkovic authored and francois141 committed Mar 12, 2024
1 parent 79d14f9 commit 6b2134d
Show file tree
Hide file tree
Showing 22 changed files with 412 additions and 216 deletions.
39 changes: 29 additions & 10 deletions api/control_plane_api_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ func (c *CpApiServer) RegisterDataplane(ctx context.Context, in *proto.Dataplane
status, err, isHeartbeat := c.ControlPlane.RegisterDataplane(ctx, in)
if status.Success && err == nil && !isHeartbeat {
c.HAProxyAPI.AddDataplane(in.IP, int(in.ProxyPort), true)
c.DisseminateHAProxyConfig()
}

return status, err
Expand All @@ -240,24 +241,20 @@ func (c *CpApiServer) DeregisterDataplane(ctx context.Context, in *proto.Datapla
status, err := c.ControlPlane.DeregisterDataplane(ctx, in)
if status.Success && err == nil {
c.HAProxyAPI.RemoveDataplane(in.IP, int(in.ProxyPort), true)
c.DisseminateHAProxyConfig()
}

return status, err
}

func (c *CpApiServer) ReconstructState(ctx context.Context, config config2.ControlPlaneConfig) error {
func (c *CpApiServer) ReconstructState(ctx context.Context, config config2.ControlPlaneConfig, haProxyCallback func()) error {
if !c.LeaderElectionServer.IsLeader() {
// This API call is not exposed to the outside, but it's called only on process startup
return errors.New("cannot request cluster state reconstruction if not the leader. " +
"Perhaps the leader has changed in the meanwhile")
}

haproxyFunction := func() {
c.ReviseHAProxyServers()
c.HAProxyAPI.RestartHAProxy()
}

return c.ControlPlane.ReconstructState(ctx, config, haproxyFunction)
return c.ControlPlane.ReconstructState(ctx, config, haProxyCallback)
}

func (c *CpApiServer) ResetMeasurements(ctx context.Context, empty *emptypb.Empty) (*proto.ActionStatus, error) {
Expand Down Expand Up @@ -301,7 +298,29 @@ func (c *CpApiServer) AppendEntries(_ context.Context, args *proto.AppendEntries
return c.LeaderElectionServer.AppendEntries(args)
}

func (c *CpApiServer) ReviseHAProxyServers() {
logrus.Infof("Revising HAProxy backend server list...")
c.ControlPlane.ReviseDataplanesInLB(c.HAProxyAPI.ReviseDataplanes)
// ReviseHAProxyConfiguration Local method for updating HAProxy configuration. Called by the remote leader to disseminate configuration.
func (c *CpApiServer) ReviseHAProxyConfiguration(_ context.Context, args *proto.HAProxyConfig) (*proto.ActionStatus, error) {
dpChanges := c.HAProxyAPI.ReviseDataplanes(args.Dataplanes)
rsChanges := c.HAProxyAPI.ReviseRegistrationServers(args.RegistrationServers)

toRestart := dpChanges || rsChanges
if toRestart {
c.HAProxyAPI.RestartHAProxy()
logrus.Info("HAProxy configuration revision done with restart!")
} else {
logrus.Info("HAProxy configuration revision done without restart!")
}

return &proto.ActionStatus{Success: true}, nil
}

func (c *CpApiServer) DisseminateHAProxyConfig() {
newConfig := c.ControlPlane.GetHAProxyConfig()

for peer, conn := range c.LeaderElectionServer.GetPeers() {
_, err := conn.ReviseHAProxyConfiguration(context.Background(), newConfig)
if err != nil {
logrus.Errorf("Failed disseminating HAProxy configuration to peer #%d", peer)
}
}
}
268 changes: 175 additions & 93 deletions api/proto/control_plane_interface.pb.go

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions api/proto/control_plane_interface.proto
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ message Failure {
repeated string SandboxIDs = 3;
}

message HAProxyConfig {
repeated string Dataplanes = 1;
repeated string RegistrationServers = 2;
}

service CpiInterface {
rpc OnMetricsReceive(AutoscalingMetric) returns (ActionStatus);
rpc ListServices(google.protobuf.Empty) returns (ServiceList);
Expand All @@ -66,4 +71,7 @@ service CpiInterface {
// RAFT leader election
rpc RequestVote(RequestVoteArgs) returns (RequestVoteReply);
rpc AppendEntries(AppendEntriesArgs) returns (AppendEntriesReply);

// HAProxy
rpc ReviseHAProxyConfiguration(HAProxyConfig) returns (ActionStatus);
}
38 changes: 38 additions & 0 deletions api/proto/control_plane_interface_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions cmd/master_node/config.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
port: "9090"
replicas: ""
registrationServer: "127.0.0.1:9091"
registrationServerReplicas: ""
verbosity: "debug"
traceOutputFolder: "data"
placementPolicy: "random"
Expand Down
1 change: 1 addition & 0 deletions cmd/master_node/config_cluster.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
port: "9090"
replicas: ""
registrationServer: "10.0.1.2:9091"
registrationServerReplicas: ""
verbosity: "debug"
traceOutputFolder: "data"
placementPolicy: "random"
Expand Down
1 change: 1 addition & 0 deletions cmd/master_node/config_cluster_raft_1.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
port: "9090"
replicas: "10.0.1.3:10000,10.0.1.4:10001"
registrationServer: "10.0.1.2:9091"
registrationServerReplicas: "10.0.1.3:9091,10.0.1.4:9091"
verbosity: "debug"
traceOutputFolder: "data"
placementPolicy: "random"
Expand Down
1 change: 1 addition & 0 deletions cmd/master_node/config_cluster_raft_2.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
port: "10000"
replicas: "10.0.1.2:9090,10.0.1.4:10001"
registrationServer: "10.0.1.3:9091"
registrationServerReplicas: "10.0.1.2:9091,10.0.1.4:9091"
verbosity: "debug"
traceOutputFolder: "data"
placementPolicy: "random"
Expand Down
1 change: 1 addition & 0 deletions cmd/master_node/config_cluster_raft_3.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
port: "10001"
replicas: "10.0.1.2:9090,10.0.1.3:10000"
registrationServer: "10.0.1.4:9091"
registrationServerReplicas: "10.0.1.2:9091,10.0.1.3:9091"
verbosity: "debug"
traceOutputFolder: "data"
placementPolicy: "random"
Expand Down
5 changes: 3 additions & 2 deletions cmd/master_node/config_raft_1.yaml
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
port: "9090"
replicas: "127.0.0.1:10000,127.0.0.1:10001"
registrationServer: "127.0.0.1:9091"
registrationServer: "127.0.0.1:13091"
registrationServerReplicas: "127.0.0.1:14091,127.0.0.1:15091"
verbosity: "debug"
traceOutputFolder: "data"
placementPolicy: "random"
persistence: true
reconstruct: true

# to use HAProxy load balancer put 127.0.0.1:8079, otherwise leave blank
loadBalancerAddress: ""
loadBalancerAddress: "127.0.0.1:8079"

removeWorkerNode: false
removeDataplane: false
Expand Down
5 changes: 3 additions & 2 deletions cmd/master_node/config_raft_2.yaml
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
port: "10000"
replicas: "127.0.0.1:9090,127.0.0.1:10001"
registrationServer: "127.0.0.1:9091"
registrationServer: "127.0.0.1:14091"
registrationServerReplicas: "127.0.0.1:13091,127.0.0.1:15091"
verbosity: "debug"
traceOutputFolder: "data"
placementPolicy: "random"
persistence: true
reconstruct: true

# to use HAProxy load balancer put 127.0.0.1:8079, otherwise leave blank
loadBalancerAddress: ""
loadBalancerAddress: "127.0.0.1:8079"

removeWorkerNode: false
removeDataplane: false
Expand Down
5 changes: 3 additions & 2 deletions cmd/master_node/config_raft_3.yaml
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
port: "10001"
replicas: "127.0.0.1:9090,127.0.0.1:10000"
registrationServer: "127.0.0.1:9091"
registrationServer: "127.0.0.1:15091"
registrationServerReplicas: "127.0.0.1:13091,127.0.0.1:14091"
verbosity: "debug"
traceOutputFolder: "data"
placementPolicy: "random"
persistence: true
reconstruct: true

# to use HAProxy load balancer put 127.0.0.1:8079, otherwise leave blank
loadBalancerAddress: ""
loadBalancerAddress: "127.0.0.1:8079"

removeWorkerNode: false
removeDataplane: false
Expand Down
16 changes: 16 additions & 0 deletions cmd/master_node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ import (
"cluster_manager/internal/control_plane/data_plane"
"cluster_manager/internal/control_plane/data_plane/empty_dataplane"
"cluster_manager/internal/control_plane/persistence"
"cluster_manager/internal/control_plane/registration_server"
"cluster_manager/internal/control_plane/workers"
"cluster_manager/internal/control_plane/workers/empty_worker"
"cluster_manager/pkg/config"
"cluster_manager/pkg/logger"
"cluster_manager/pkg/profiler"
"context"
"flag"
"net"
_ "net/http/pprof"
"os/signal"
"path"
Expand Down Expand Up @@ -72,6 +74,9 @@ func main() {
}

cpApiServer, isLeader := api.CreateNewCpApiServer(cpApiCreationArgs)
stopRegistrationServer := registration_server.StartServiceRegistrationServer(cpApiServer, getRegistrationPort(&cfg))

cpApiServer.HAProxyAPI.StartHAProxy()

go profiler.SetupProfilerServer(cfg.Profiler)

Expand All @@ -88,8 +93,19 @@ func main() {
case leadership := <-isLeader:
electionState.UpdateLeadership(leadership)
case <-ctx.Done():
stopRegistrationServer <- struct{}{}

logrus.Info("Received interruption signal, try to gracefully stop")
return
}
}
}

func getRegistrationPort(cfg *config.ControlPlaneConfig) string {
_, registrationPort, err := net.SplitHostPort(cfg.RegistrationServer)
if err != nil {
logrus.Fatal("Invalid registration server address.")
}

return registrationPort
}
Loading

0 comments on commit 6b2134d

Please sign in to comment.