Skip to content

Commit

Permalink
Registration server pointing to LB or DP directly
Browse files Browse the repository at this point in the history
Heartbeat logging bugfixes

Signed-off-by: Lazar Cvetković <[email protected]>
  • Loading branch information
cvetkovic committed Mar 5, 2024
1 parent 7fc4688 commit d26b4bd
Show file tree
Hide file tree
Showing 14 changed files with 71 additions and 36 deletions.
7 changes: 4 additions & 3 deletions api/control_plane_api_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func CreateNewCpApiServer(args *CpApiServerCreationArguments) (*CpApiServer, cha
cpApiServer := &CpApiServer{
LeaderElectionServer: leaderElectionServer,
ControlPlane: cp,
HAProxyAPI: haproxy.NewHAProxyAPI(),
HAProxyAPI: haproxy.NewHAProxyAPI(args.Cfg.LoadBalancerAddress),
}

go grpc_helpers.CreateGRPCServer(args.Cfg.Port, func(sr grpc.ServiceRegistrar) {
Expand Down Expand Up @@ -218,7 +218,7 @@ func (c *CpApiServer) RegisterDataplane(ctx context.Context, in *proto.Dataplane
}

status, err := c.ControlPlane.RegisterDataplane(ctx, in)
if status.Success && err != nil {
if status.Success && err == nil {
c.HAProxyAPI.AddDataplane(in.IP, int(in.ProxyPort))
}

Expand All @@ -238,7 +238,7 @@ func (c *CpApiServer) DeregisterDataplane(ctx context.Context, in *proto.Datapla
}

status, err := c.ControlPlane.DeregisterDataplane(ctx, in)
if status.Success && err != nil {
if status.Success && err == nil {
c.HAProxyAPI.RemoveDataplane(in.IP, int(in.ProxyPort))
}

Expand Down Expand Up @@ -297,5 +297,6 @@ func (c *CpApiServer) AppendEntries(_ context.Context, args *proto.AppendEntries
}

func (c *CpApiServer) ReviseHAProxyServers() {
logrus.Infof("Revising HAProxy backend server list...")
c.ControlPlane.ReviseDataplanesInLB(c.HAProxyAPI.ReviseDataplanes)
}
2 changes: 2 additions & 0 deletions cmd/master_node/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ placementPolicy: "random"
persistence: false
reconstruct: false

loadBalancerAddress: ""

removeWorkerNode: false
removeDataplane: false

Expand Down
2 changes: 2 additions & 0 deletions cmd/master_node/config_cluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ placementPolicy: "random"
persistence: false
reconstruct: false

loadBalancerAddress: ""

removeWorkerNode: false
removeDataplane: false

Expand Down
2 changes: 2 additions & 0 deletions cmd/master_node/config_cluster_raft_1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ placementPolicy: "random"
persistence: true
reconstruct: true

loadBalancerAddress: ""

removeWorkerNode: false
removeDataplane: false

Expand Down
2 changes: 2 additions & 0 deletions cmd/master_node/config_cluster_raft_2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ placementPolicy: "random"
persistence: true
reconstruct: true

loadBalancerAddress: ""

removeWorkerNode: false
removeDataplane: false

Expand Down
2 changes: 2 additions & 0 deletions cmd/master_node/config_cluster_raft_3.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ placementPolicy: "random"
persistence: true
reconstruct: true

loadBalancerAddress: ""

removeWorkerNode: false
removeDataplane: false

Expand Down
2 changes: 2 additions & 0 deletions cmd/master_node/config_raft_1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ placementPolicy: "random"
persistence: true
reconstruct: true

loadBalancerAddress: ""

removeWorkerNode: false
removeDataplane: false

Expand Down
2 changes: 2 additions & 0 deletions cmd/master_node/config_raft_2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ placementPolicy: "random"
persistence: true
reconstruct: true

loadBalancerAddress: ""

removeWorkerNode: false
removeDataplane: false

Expand Down
2 changes: 2 additions & 0 deletions cmd/master_node/config_raft_3.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ placementPolicy: "random"
persistence: true
reconstruct: true

loadBalancerAddress: ""

removeWorkerNode: false
removeDataplane: false

Expand Down
8 changes: 4 additions & 4 deletions configs/haproxy.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ defaults
timeout check 10s

frontend dirigent
bind *:8080
mode tcp
bind *:8079
mode http
option tcplog
default_backend dirigent_data_planes

backend dirigent_data_planes
option httpchk GET /health
http-check expect status 200
mode tcp
mode http
balance hdr(host)
server dp_1 127.0.0.1:8080 check
#server dp_1 127.0.0.1:8080 check
28 changes: 18 additions & 10 deletions internal/control_plane/registration_server/registration_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,24 @@ func registrationHandler(cpApi *api.CpApiServer) func(w http.ResponseWriter, r *
return
}

endpointList := ""
_, err = w.Write([]byte(GetLoadBalancerAddress(cpApi)))
if err != nil {
http.Error(w, "Error writing endpoints.", http.StatusInternalServerError)
return
}

logrus.Debugf("Successfully registered function %s.", name)
}
}

func GetLoadBalancerAddress(cpApi *api.CpApiServer) string {
if cpApi.HAProxyAPI.GetLoadBalancerAddress() == "" {
endpointList := ""
cnt := 0

cpApi.ControlPlane.DataPlaneConnections.RLock()
defer cpApi.ControlPlane.DataPlaneConnections.RUnlock()

for _, conn := range cpApi.ControlPlane.DataPlaneConnections.GetMap() {
setDelimiter := cnt != cpApi.ControlPlane.DataPlaneConnections.Len()-1
delimiter := ""
Expand All @@ -137,18 +151,12 @@ func registrationHandler(cpApi *api.CpApiServer) func(w http.ResponseWriter, r *
}

endpointList += fmt.Sprintf("%s:%s%s", conn.GetIP(), conn.GetProxyPort(), delimiter)

cnt++
}
cpApi.ControlPlane.DataPlaneConnections.RUnlock()

_, err = w.Write([]byte(endpointList))
if err != nil {
http.Error(w, "Error writing endpoints.", http.StatusInternalServerError)
return
}

logrus.Debugf("Successfully registered function %s.", name)
return endpointList
} else {
return cpApi.HAProxyAPI.GetLoadBalancerAddress()
}
}

Expand Down
17 changes: 12 additions & 5 deletions internal/data_plane/haproxy/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,22 @@ const (
)

type API struct {
client *clientnative.HAProxyClient
client *clientnative.HAProxyClient
lbAddress string

// addressTo - e.g., 10.0.1.2:8080 -> dataplane_123
addressToName map[string]string
mutex sync.Mutex
}

func NewHAProxyAPI() *API {
func NewHAProxyAPI(loadBalancerAddress string) *API {
config := getConfigClient()
runtime := getRuntimeClient(config)

api := &API{
client: getHAProxyClient(config, runtime),
client: getHAProxyClient(config, runtime),
lbAddress: loadBalancerAddress,

addressToName: make(map[string]string),
}

Expand All @@ -42,6 +45,10 @@ func NewHAProxyAPI() *API {
return api
}

func (api *API) GetLoadBalancerAddress() string {
return api.lbAddress
}

func (api *API) ListDataplanes() (names []string, addresses []string) {
_, servers, err := api.client.Configuration.GetServers(BackendName, "")
if err != nil {
Expand Down Expand Up @@ -84,11 +91,11 @@ func (api *API) addServer(ipAddress string, port int, transaction string, versio
defer api.mutex.Unlock()

if _, ok := api.addressToName[url]; ok {
logrus.Errorf("A server with the same network address already exists.")
return
} else {
api.persistServerMetadata(name, ipAddress, port, transaction, version)
api.addressToName[url] = name
logrus.Infof("Added data plane with address %s:%d to HAProxy backend.", ipAddress, port)
}
}

Expand Down Expand Up @@ -116,7 +123,6 @@ func (api *API) removeServerByName(ipAddress string, port int, transaction strin
defer api.mutex.Unlock()

if dataplaneName, ok := api.addressToName[url]; !ok {
logrus.Errorf("Cannot delete requested server as it does not exist.")
return
} else {
err := api.client.Configuration.DeleteServer(dataplaneName, BackendName, transaction, version)
Expand All @@ -125,6 +131,7 @@ func (api *API) removeServerByName(ipAddress string, port int, transaction strin
}

delete(api.addressToName, url)
logrus.Infof("Removed data plane with address %s:%d to HAProxy backend.", ipAddress, port)
}
}

Expand Down
4 changes: 3 additions & 1 deletion internal/data_plane/haproxy/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
)

func TestEditBackend(t *testing.T) {
api := NewHAProxyAPI()
// To run this test make sure HAProxy is installed on the machine and
// configs/haproxy.cfg is placed in /etc/haproxy/ folder
api := NewHAProxyAPI("dummy")

api.DeleteAllDataplanes()
if k, _ := api.ListDataplanes(); len(k) != 0 {
Expand Down
27 changes: 14 additions & 13 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,20 @@ import (
)

type ControlPlaneConfig struct {
Port string `mapstructure:"port"`
Replicas []string `mapstructure:"replicas"`
PortRegistration string `mapstructure:"portRegistration"`
Verbosity string `mapstructure:"verbosity"`
TraceOutputFolder string `mapstructure:"traceOutputFolder"`
PlacementPolicy string `mapstructure:"placementPolicy"`
Persistence bool `mapstructure:"persistence"`
Profiler ProfilerConfig `mapstructure:"profiler"`
RedisConf RedisConf `mapstructure:"redis"`
Reconstruct bool `mapstructure:"reconstruct"`
RemoveWorkerNode bool `mapstructure:"removeWorkerNode"`
RemoveDataplane bool `mapstructure:"removeDataplane"`
PrecreateSnapshots bool `mapstructure:"precreateSnapshots"`
Port string `mapstructure:"port"`
Replicas []string `mapstructure:"replicas"`
PortRegistration string `mapstructure:"portRegistration"`
Verbosity string `mapstructure:"verbosity"`
TraceOutputFolder string `mapstructure:"traceOutputFolder"`
PlacementPolicy string `mapstructure:"placementPolicy"`
Persistence bool `mapstructure:"persistence"`
Profiler ProfilerConfig `mapstructure:"profiler"`
RedisConf RedisConf `mapstructure:"redis"`
Reconstruct bool `mapstructure:"reconstruct"`
LoadBalancerAddress string `mapstructure:"loadBalancerAddress"`
RemoveWorkerNode bool `mapstructure:"removeWorkerNode"`
RemoveDataplane bool `mapstructure:"removeDataplane"`
PrecreateSnapshots bool `mapstructure:"precreateSnapshots"`
}

type DataPlaneConfig struct {
Expand Down

0 comments on commit d26b4bd

Please sign in to comment.