Skip to content

Commit

Permalink
Cluster operation
Browse files Browse the repository at this point in the history
First steps to enable cluster operation for R3 instances, based on multiple application server nodes and a shared, single database.

# Implemented
* Node self registration in shared database.
* Node task distribution.
* Node task collection and execution.
* Node based logging.
* Node statistics (WIP).

# Not yet implemented
* Filter logs by node.
* Cluster admin UI with options and statistics.
* Tasks: Client re-authentication/disconnect, license update
  • Loading branch information
r3-gabriel committed Jun 5, 2022
1 parent 2d4db98 commit 03338bd
Show file tree
Hide file tree
Showing 35 changed files with 449 additions and 108 deletions.
10 changes: 3 additions & 7 deletions cache/cache_access.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,15 @@ func RenewAccessById(loginId int64) error {
}

// update clients
func ChangedConfig() {
ClientEvent_handlerChan <- types.ClientEvent{LoginId: 0, ConfigChanged: true}
}
func KickLoginById(loginId int64) { // kick single login
ClientEvent_handlerChan <- types.ClientEvent{LoginId: loginId, Kick: true}
}
func KickNonAdmins() { // kick all non-admins
ClientEvent_handlerChan <- types.ClientEvent{LoginId: 0, KickNonAdmin: true}
}
func ChangedBuilderMode(modeActive bool) {
if modeActive {
ClientEvent_handlerChan <- types.ClientEvent{LoginId: 0, BuilderOn: true}
} else {
ClientEvent_handlerChan <- types.ClientEvent{LoginId: 0, BuilderOff: true}
}
}

// load access permissions for login ID into cache
func load(loginId int64, renewal bool) error {
Expand Down
14 changes: 14 additions & 0 deletions cache/cache_cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package cache

import "github.com/gofrs/uuid"

var (
nodeId uuid.UUID // ID of node, self assigned on startup if not set
)

func GetNodeId() uuid.UUID {
return nodeId
}
func SetNodeId(id uuid.UUID) {
nodeId = id
}
5 changes: 1 addition & 4 deletions cache/cache_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,7 @@ func GetSchemaCacheJson() json.RawMessage {
// update module schema cache in memory
// takes either single module ID for specific update or NULL for updating all modules
// can just load schema or create a new version timestamp, which forces reload on clients
func UpdateSchemaAll(newVersion bool) error {
return UpdateSchema(make([]uuid.UUID, 0), newVersion)
}
func UpdateSchema(moduleIdsUpdateOnly []uuid.UUID, newVersion bool) error {
func UpdateSchema(newVersion bool, moduleIdsUpdateOnly []uuid.UUID) error {
var err error

// inform all clients about schema reloading
Expand Down
4 changes: 3 additions & 1 deletion cache/captions/de_de
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@
"logLevelApplication":"Anwendungen",
"logLevelBackup":"Backup",
"logLevelCache":"Cache",
"logLevelCluster":"Cluster",
"logLevelCsv":"CSV-Import/Export",
"logLevelLdap":"LDAP",
"logLevelMail":"Mail",
Expand Down Expand Up @@ -280,7 +281,8 @@
"level2":"Warnung",
"level3":"Info",
"message":"Nachricht",
"module":"Anwendung"
"module":"Anwendung",
"node":"Cluster-Knoten"
},
"mails":{
"account":"E-Mail-Account",
Expand Down
4 changes: 3 additions & 1 deletion cache/captions/en_us
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@
"logLevelApplication":"Applications",
"logLevelBackup":"Backup",
"logLevelCache":"Cache",
"logLevelCluster":"Cluster",
"logLevelCsv":"CSV import/export",
"logLevelLdap":"LDAP",
"logLevelMail":"Mail",
Expand Down Expand Up @@ -280,7 +281,8 @@
"level2":"Warning",
"level3":"Info",
"message":"Message",
"module":"Application"
"module":"Application",
"node":"Cluster node"
},
"mails":{
"account":"Email account",
Expand Down
4 changes: 3 additions & 1 deletion cache/captions/it_it
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@
"logLevelApplication":"Applicazioni",
"logLevelBackup":"Backup",
"logLevelCache":"Cache",
"logLevelCluster":"Cluster",
"logLevelCsv":"Importa/esporta CSV",
"logLevelLdap":"LDAP",
"logLevelMail":"Posta",
Expand Down Expand Up @@ -280,7 +281,8 @@
"level2":"Avvertimento",
"level3":"Informazione",
"message":"Messaggio",
"module":"Applicazione"
"module":"Applicazione",
"node":"Cluster node"
},
"mails":{
"account":"Account email",
Expand Down
4 changes: 3 additions & 1 deletion cache/captions/ro_ro
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@
"logLevelApplication":"Aplicații",
"logLevelBackup":"Backup",
"logLevelCache":"Tampon",
"logLevelCluster":"Cluster",
"logLevelCsv":"CSV import/export",
"logLevelLdap":"LDAP",
"logLevelMail":"Mail",
Expand Down Expand Up @@ -280,7 +281,8 @@
"level2":"Warning",
"level3":"Info",
"message":"Message",
"module":"Application"
"module":"Application",
"node":"Cluster node"
},
"mails":{
"account":"Cont de email",
Expand Down
91 changes: 91 additions & 0 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package cluster

import (
"r3/cache"
"r3/config"
"r3/db"
"r3/handler/websocket"
"r3/log"
"r3/tools"
"runtime"
"time"

"github.com/gofrs/uuid"
)

// regularly update statistics for this node in shared database
func StatUpdater() {
log.Info("cluster", "started statistics updater routine")

for {
time.Sleep(time.Second * time.Duration(60))

var m runtime.MemStats
runtime.ReadMemStats(&m)

memoryMb := m.Sys / 1024 / 1024
uptime := 0

if _, err := db.Pool.Exec(db.Ctx, `
UPDATE instance_cluster.node
SET date_check_in = $1, stat_sessions = $2, stat_memory = $3,
stat_uptime = $4
WHERE id = $5
`, tools.GetTimeUnix(), websocket.GetClientCount(), memoryMb, uptime, cache.GetNodeId()); err != nil {
log.Error("cluster", "failed to update cluster statistics", err)
}
}
}

// register cluster node with shared database
// read existing node ID from configuration file if exists
func SetupNode() error {

// create node ID for itself if it does not exist yet
if config.File.Cluster.NodeId == "" {
id, err := uuid.NewV4()
if err != nil {
return err
}

// write node ID to local config file
config.File.Cluster.NodeId = id.String()

if err := config.WriteFile(); err != nil {
return err
}
}

// read node ID from config file
nodeId, err := uuid.FromString(config.File.Cluster.NodeId)
if err != nil {
return err
}

// store node ID for other uses
cache.SetNodeId(nodeId)
log.SetNodeId(nodeId)

// check whether node is already registered
var exists bool
if err := db.Pool.QueryRow(db.Ctx, `
SELECT EXISTS(
SELECT id
FROM instance_cluster.node
WHERE id = $1
)
`, nodeId).Scan(&exists); err != nil {
return err
}

if !exists {
if _, err := db.Pool.Exec(db.Ctx, `
INSERT INTO instance_cluster.node (id, name, date_check_in,
stat_sessions, stat_memory, stat_uptime)
VALUES ($1,$2,0,-1,-1,-1)
`, nodeId, nodeId.String()); err != nil {
return err
}
}
return nil
}
163 changes: 163 additions & 0 deletions cluster/tasks/tasks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package tasks

import (
"encoding/json"
"fmt"
"r3/bruteforce"
"r3/cache"
"r3/config"
"r3/db"
"r3/log"
"time"

"github.com/gofrs/uuid"
)

type task struct {
action string
payload []byte
}
type taskConfigApply struct {
SwitchToMaintenance bool `json:"switchToMaintenance"`
}
type taskSchemaLoadAll struct {
NewVersion bool `json:"newVersion"`
}
type taskSchemaLoad struct {
ModuleIdsUpdateOnly []uuid.UUID `json:"moduleIdsUpdateOnly"`
NewVersion bool `json:"newVersion"`
}

// regularly collect tasks for this node to execute
func TaskCollector() {
log.Info("cluster", "started task collector routine")
var tasks []task

for {
time.Sleep(time.Second * time.Duration(3))

tasks = make([]task, 0)

rows, err := db.Pool.Query(db.Ctx, `
SELECT action, payload
FROM instance_cluster.node_task
WHERE node_id = $1
`, cache.GetNodeId())
if err != nil {
log.Error("cluster", "failed to read node tasks from database", err)
continue
}

for rows.Next() {
var t task
if err := rows.Scan(&t.action, &t.payload); err != nil {
log.Error("cluster", "failed to scan node task from database", err)
continue
}
tasks = append(tasks, t)
}
rows.Close()

// no tasks, nothing to do
if len(tasks) == 0 {
log.Info("cluster", "task collector has found no tasks to execute")
continue
}

// delete collected tasks
if _, err := db.Pool.Exec(db.Ctx, `
DELETE FROM instance_cluster.node_task
WHERE node_id = $1
`, cache.GetNodeId()); err != nil {
log.Error("cluster", "failed to delete node tasks from database", err)
continue
}

// execute collected tasks
for _, t := range tasks {
log.Info("cluster", fmt.Sprintf("executing task '%s'", t.action))

switch t.action {
case "configApply":
var p taskConfigApply
if err := json.Unmarshal(t.payload, &p); err != nil {
log.Error("cluster", "failed to unmarshal task", err)
continue
}
if err := ConfigApply(false, true, p.SwitchToMaintenance); err != nil {
log.Error("cluster", fmt.Sprintf("failed to start task '%s'", t.action), err)
continue
}
case "schemaLoad":
var p taskSchemaLoad
if err := json.Unmarshal(t.payload, &p); err != nil {
log.Error("cluster", "failed to unmarshal task", err)
continue
}
if err := SchemaLoad(false, p.NewVersion, p.ModuleIdsUpdateOnly); err != nil {
log.Error("cluster", fmt.Sprintf("failed to start task '%s'", t.action), err)
continue
}
}
}
}
}

func setTasks(action string, payload interface{}) error {
payloadJson, err := json.Marshal(payload)
if err != nil {
return err
}

_, err = db.Pool.Exec(db.Ctx, `
INSERT INTO instance_cluster.node_task (node_id, action, payload)
SELECT id, $1, $2
FROM instance_cluster.node
WHERE id <> $3
`, action, payloadJson, cache.GetNodeId())

return err
}

// tasks to execute, relevant to all cluster nodes
func ConfigApply(updateNodes bool, fromOtherNode bool, switchToMaintenance bool) error {
if updateNodes {
payload := taskConfigApply{
SwitchToMaintenance: switchToMaintenance,
}
if err := setTasks("configApply", payload); err != nil {
return err
}
}

// if change occured on other node, load config from database
if fromOtherNode {
config.LoadFromDb()
}

// update websocket clients if relevant config changed
if switchToMaintenance {
cache.KickNonAdmins()
}

// apply config to other areas
bruteforce.SetConfig()
cache.ChangedConfig()
config.SetLogLevels()
return nil
}
func SchemaLoadAll(updateNodes bool, newVersion bool) error {
return SchemaLoad(updateNodes, newVersion, make([]uuid.UUID, 0))
}
func SchemaLoad(updateNodes bool, newVersion bool, moduleIdsUpdateOnly []uuid.UUID) error {
if updateNodes {
payload := taskSchemaLoad{
ModuleIdsUpdateOnly: moduleIdsUpdateOnly,
NewVersion: newVersion,
}
if err := setTasks("schemaLoad", payload); err != nil {
return err
}
}
return cache.UpdateSchema(newVersion, moduleIdsUpdateOnly)
}
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func SetLogLevels() {
log.SetLogLevel("application", int(GetUint64("logApplication")))
log.SetLogLevel("backup", int(GetUint64("logBackup")))
log.SetLogLevel("cache", int(GetUint64("logCache")))
log.SetLogLevel("cluster", int(GetUint64("logCluster")))
log.SetLogLevel("csv", int(GetUint64("logCsv")))
log.SetLogLevel("ldap", int(GetUint64("logLdap")))
log.SetLogLevel("mail", int(GetUint64("logMail")))
Expand Down
Loading

0 comments on commit 03338bd

Please sign in to comment.