Skip to content

Commit

Permalink
Guard NodeID to handle race condition (TykTechnologies#2436)
Browse files Browse the repository at this point in the history
  • Loading branch information
furkansenharputlu authored and buger committed Jul 29, 2019
1 parent fc90b2d commit 486cc8b
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 19 deletions.
4 changes: 2 additions & 2 deletions gateway/api_definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,8 @@ func (a APIDefinitionLoader) FromDashboardService(endpoint, secret string) ([]*A
}

newRequest.Header.Set("authorization", secret)
log.Debug("Using: NodeID: ", NodeID)
newRequest.Header.Set(headers.XTykNodeID, NodeID)
log.Debug("Using: NodeID: ", getNodeID())
newRequest.Header.Set(headers.XTykNodeID, getNodeID())

newRequest.Header.Set(headers.XTykNonce, ServiceNonce)

Expand Down
11 changes: 6 additions & 5 deletions gateway/dashboard_register.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (h *HTTPDashboardHandler) NotifyDashboardOfEvent(event interface{}) error {
}

req.Header.Set("authorization", h.Secret)
req.Header.Set(headers.XTykNodeID, NodeID)
req.Header.Set(headers.XTykNodeID, getNodeID())
req.Header.Set(headers.XTykNonce, ServiceNonce)

c := initialiseClient(5 * time.Second)
Expand Down Expand Up @@ -168,14 +168,15 @@ func (h *HTTPDashboardHandler) Register() error {

// Set the NodeID
var found bool
NodeID, found = val.Message["NodeID"]
nodeID, found := val.Message["NodeID"]
setNodeID(nodeID)
if !found {
dashLog.Error("Failed to register node, retrying in 5s")
time.Sleep(time.Second * 5)
return h.Register()
}

dashLog.WithField("id", NodeID).Info("Node Registered")
dashLog.WithField("id", getNodeID()).Info("Node Registered")

// Set the nonce
ServiceNonce = val.Nonce
Expand Down Expand Up @@ -217,7 +218,7 @@ func (h *HTTPDashboardHandler) newRequest(endpoint string) *http.Request {
}

func (h *HTTPDashboardHandler) sendHeartBeat(req *http.Request, client *http.Client) error {
req.Header.Set(headers.XTykNodeID, NodeID)
req.Header.Set(headers.XTykNodeID, getNodeID())
req.Header.Set(headers.XTykNonce, ServiceNonce)

resp, err := client.Do(req)
Expand All @@ -244,7 +245,7 @@ func (h *HTTPDashboardHandler) sendHeartBeat(req *http.Request, client *http.Cli
func (h *HTTPDashboardHandler) DeRegister() error {
req := h.newRequest(h.DeRegistrationEndpoint)

req.Header.Set(headers.XTykNodeID, NodeID)
req.Header.Set(headers.XTykNodeID, getNodeID())
req.Header.Set(headers.XTykNonce, ServiceNonce)

c := initialiseClient(5 * time.Second)
Expand Down
6 changes: 3 additions & 3 deletions gateway/distributed_rate_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ var DRLManager = &drl.DRL{}
func setupDRL() {
drlManager := &drl.DRL{}
drlManager.Init()
drlManager.ThisServerID = NodeID + "|" + hostDetails.Hostname
drlManager.ThisServerID = getNodeID() + "|" + hostDetails.Hostname
log.Debug("DRL: Setting node ID: ", drlManager.ThisServerID)
DRLManager = drlManager
}
Expand All @@ -29,7 +29,7 @@ func startRateLimitNotifications() {
go func() {
log.Info("Starting gateway rate limiter notifications...")
for {
if NodeID != "" {
if getNodeID() != "" {
NotifyCurrentServerStatus()
} else {
log.Warning("Node not registered yet, skipping DRL Notification")
Expand Down Expand Up @@ -60,7 +60,7 @@ func NotifyCurrentServerStatus() {

server := drl.Server{
HostName: hostDetails.Hostname,
ID: NodeID,
ID: getNodeID(),
LoadPerSec: rate,
TagHash: getTagHash(),
}
Expand Down
2 changes: 1 addition & 1 deletion gateway/le_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func onLESSLStatusReceivedHandler(payload string) {
log.Debug("Received LE data: ", serverData)

// not great
if serverData.ID != NodeID {
if serverData.ID != getNodeID() {
log.Info("Received Redis LE change notification!")
GetLEState(&LE_MANAGER)
}
Expand Down
2 changes: 1 addition & 1 deletion gateway/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func LoadPoliciesFromDashboard(endpoint, secret string, allowExplicit bool) map[
}

newRequest.Header.Set("authorization", secret)
newRequest.Header.Set("x-tyk-nodeid", NodeID)
newRequest.Header.Set("x-tyk-nodeid", getNodeID())

newRequest.Header.Set("x-tyk-nonce", ServiceNonce)

Expand Down
6 changes: 3 additions & 3 deletions gateway/redis_signal_handle_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func handleNewConfiguration(payload string) {
}

// Make sure payload matches nodeID and hostname
if configPayload.ForHostname != hostDetails.Hostname && configPayload.ForNodeID != NodeID {
if configPayload.ForHostname != hostDetails.Hostname && configPayload.ForNodeID != getNodeID() {
log.WithFields(logrus.Fields{
"prefix": "pub-sub",
}).Info("Configuration update received, no NodeID/Hostname match found")
Expand Down Expand Up @@ -151,7 +151,7 @@ func handleSendMiniConfig(payload string) {
}

// Make sure payload matches nodeID and hostname
if configPayload.FromHostname != hostDetails.Hostname && configPayload.FromNodeID != NodeID {
if configPayload.FromHostname != hostDetails.Hostname && configPayload.FromNodeID != getNodeID() {
log.WithFields(logrus.Fields{
"prefix": "pub-sub",
}).Debug("Configuration request received, no NodeID/Hostname match found, ignoring")
Expand All @@ -168,7 +168,7 @@ func handleSendMiniConfig(payload string) {

returnPayload := ReturnConfigPayload{
FromHostname: hostDetails.Hostname,
FromNodeID: NodeID,
FromNodeID: getNodeID(),
Configuration: config,
TimeStamp: time.Now().Unix(),
}
Expand Down
23 changes: 19 additions & 4 deletions gateway/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ var (
LE_MANAGER letsencrypt.Manager
LE_FIRSTRUN bool

NodeID string
muNodeID sync.Mutex // guards NodeID
NodeID string

runningTests = false

Expand All @@ -109,6 +110,20 @@ const (
appName = "tyk-gateway"
)

// setNodeID writes NodeID safely.
func setNodeID(nodeID string) {
muNodeID.Lock()
NodeID = nodeID
muNodeID.Unlock()
}

// getNodeID reads NodeID safely.
func getNodeID() string {
muNodeID.Lock()
defer muNodeID.Unlock()
return NodeID
}

func getApiSpec(apiID string) *APISpec {
apisMu.RLock()
spec := apisByID[apiID]
Expand Down Expand Up @@ -961,7 +976,7 @@ func Start() {
os.Exit(0)
}

NodeID = "solo-" + uuid.NewV4().String()
setNodeID("solo-" + uuid.NewV4().String())

if err := initialiseSystem(); err != nil {
mainLog.Fatalf("Error initialising system: %v", err)
Expand All @@ -986,7 +1001,7 @@ func Start() {
time.Sleep(10 * time.Second)

os.Setenv("TYK_SERVICE_NONCE", ServiceNonce)
os.Setenv("TYK_SERVICE_NODEID", NodeID)
os.Setenv("TYK_SERVICE_NODEID", getNodeID())
}
}

Expand Down Expand Up @@ -1332,7 +1347,7 @@ func listen(listener, controlListener net.Listener, err error) {
handleDashboardRegistration()

} else {
NodeID = nodeID
setNodeID(nodeID)
ServiceNonce = nonce
mainLog.Info("State recovered")

Expand Down

0 comments on commit 486cc8b

Please sign in to comment.