Skip to content

Commit

Permalink
New set of RPC fixes (TykTechnologies#1498)
Browse files Browse the repository at this point in the history
- Removed deadlocks caused in some cases by mutexes
- Enable oAuth for RPC emergency mode,  should work since policies are here
- Simplified `listen` reload logic
- Simplified keep alive function
- Restrict concurrent "Connect" calls
- Set default connection pool to 20 (Our hybrid docker image already have it but in config file)
- Fixed concurrency issue causing multiple reloads when backup is loaded
- Added tests for protected APIs when in backup mode
  • Loading branch information
buger authored Feb 26, 2018
1 parent 8c38ce8 commit 6e39a5e
Show file tree
Hide file tree
Showing 9 changed files with 229 additions and 149 deletions.
5 changes: 2 additions & 3 deletions analytics.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,13 +178,11 @@ func (r *RedisAnalyticsHandler) Init() {

// RecordHit will store an AnalyticsRecord in Redis
func (r *RedisAnalyticsHandler) RecordHit(record AnalyticsRecord) error {
configMu.Lock()
defer configMu.Unlock()

r.AnalyticsPool.SendWork(func() {
// If we are obfuscating API Keys, store the hashed representation (config check handled in hashing function)
record.APIKey = storage.HashKey(record.APIKey)

configMu.Lock()
if config.Global.SlaveOptions.UseRPC {
// Extend tag list to include this data so wecan segment by node if necessary
record.Tags = append(record.Tags, "tyk-hybrid-rpc")
Expand All @@ -194,6 +192,7 @@ func (r *RedisAnalyticsHandler) RecordHit(record AnalyticsRecord) error {
// Extend tag list to include this data so wecan segment by node if necessary
record.Tags = append(record.Tags, config.Global.DBAppConfOptions.Tags...)
}
configMu.Unlock()

// Lets add some metadata
if record.APIKey != "" {
Expand Down
5 changes: 3 additions & 2 deletions api_healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,12 @@ func (h *DefaultHealthChecker) CreateKeyName(subKey HealthPrefix) string {
// reportHealthValue is a shortcut we can use throughout the app to push a health check value
func reportHealthValue(spec *APISpec, counter HealthPrefix, value string) {
configMu.Lock()
defer configMu.Unlock()

if !config.Global.HealthCheck.EnableHealthChecks {
configMu.Unlock()
return
}
configMu.Unlock()

spec.Health.StoreCounterVal(counter, value)
}

Expand Down
13 changes: 5 additions & 8 deletions api_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,15 +224,11 @@ func processSpec(spec *APISpec, apisByListen map[string]int,

if spec.UseOauth2 {
log.Debug("Loading OAuth Manager")
if !rpcEmergencyMode {
oauthManager := addOAuthHandlers(spec, subrouter)
log.Debug("-- Added OAuth Handlers")
oauthManager := addOAuthHandlers(spec, subrouter)
log.Debug("-- Added OAuth Handlers")

spec.OAuthManager = oauthManager
log.Debug("Done loading OAuth Manager")
} else {
log.Warning("RPC Emergency mode detected! OAuth APIs will not function!")
}
spec.OAuthManager = oauthManager
log.Debug("Done loading OAuth Manager")
}

enableVersionOverrides := false
Expand Down Expand Up @@ -667,6 +663,7 @@ func loadApps(specs []*APISpec, muxer *mux.Router) {
}).Info("Initialised API Definitions")

if config.Global.SlaveOptions.UseRPC {
startRPCKeepaliveWatcher(rpcAuthStore)
startRPCKeepaliveWatcher(rpcOrgStore)
}
}
1 change: 0 additions & 1 deletion handler_error.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ func (e *ErrorHandler) HandleError(w http.ResponseWriter, r *http.Request, errMs

ip := requestIP(r)
if config.Global.StoreAnalytics(ip) {

t := time.Now()

addVersionHeader(w, r)
Expand Down
2 changes: 2 additions & 0 deletions handler_success.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ func (s *SuccessHandler) RecordHit(r *http.Request, timing int64, code int, requ

rawRequest := ""
rawResponse := ""

if recordDetail(r) {
// Get the wire format representation
var wireFormatReq bytes.Buffer
Expand Down Expand Up @@ -250,6 +251,7 @@ func (s *SuccessHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) *http
if recordDetail(r) {
copiedResponse = copyResponse(resp)
}

s.RecordHit(r, int64(millisec), resp.StatusCode, copiedRequest, copiedResponse)
}
log.Debug("Done proxy")
Expand Down
5 changes: 5 additions & 0 deletions helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,11 @@ func updateAPIVersion(spec *APISpec, name string, verGen func(version *apidef.Ve
spec.VersionData.Versions[name] = version
}

func jsonMarshalString(i interface{}) (out string) {
b, _ := json.Marshal(i)
return string(b)
}

func buildAPI(apiGens ...func(spec *APISpec)) (specs []*APISpec) {
if len(apiGens) == 0 {
apiGens = append(apiGens, func(spec *APISpec) {})
Expand Down
62 changes: 19 additions & 43 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -690,10 +690,6 @@ func doReload() {
}).Info("API reload complete")

mainRouter = newRouter

// // Unset these
// rpcEmergencyModeLoaded = false
// rpcEmergencyMode = false
}

// startReloadChan and reloadDoneChan are used by the two reload loops
Expand Down Expand Up @@ -1218,6 +1214,7 @@ func start() {
Address: config.Global.SlaveOptions.ConnectionString,
SuppressRegister: true,
}

RPCListener.Connect()
go rpcReloadLoop(config.Global.SlaveOptions.RPCKey)
go RPCListener.StartRPCLoopCheck(config.Global.SlaveOptions.RPCKey)
Expand Down Expand Up @@ -1336,7 +1333,15 @@ func listen(l, controlListener net.Listener, err error) {

drlOnce.Do(startDRL)

// Handle reload when SIGUSR2 is received
if config.Global.ControlAPIPort > 0 {
loadAPIEndpoints(controlRouter)
}

if !rpcEmergencyMode {
doReload()
}

// Error not empty if handle reload when SIGUSR2 is received
if err != nil {
// Listen on a TCP or a UNIX domain socket (TCP here).
log.WithFields(logrus.Fields{
Expand All @@ -1346,18 +1351,6 @@ func listen(l, controlListener net.Listener, err error) {
// handle dashboard registration and nonces if available
handleDashboardRegistration()

if !rpcEmergencyMode {
count := syncAPISpecs()
if count > 0 {
loadGlobalApps(mainRouter)
syncPolicies()
}

if config.Global.ControlAPIPort > 0 {
loadAPIEndpoints(controlRouter)
}
}

// Use a custom server so we can control keepalives
if config.Global.HttpServerOptions.OverrideDefaults {
mainRouter.SkipClean(config.Global.HttpServerOptions.SkipURLCleaning)
Expand Down Expand Up @@ -1393,10 +1386,8 @@ func listen(l, controlListener net.Listener, err error) {

go http.Serve(l, mainHandler{})

if !rpcEmergencyMode {
if controlListener != nil {
go http.Serve(controlListener, controlRouter)
}
if controlListener != nil {
go http.Serve(controlListener, controlRouter)
}
}
} else {
Expand All @@ -1420,21 +1411,8 @@ func listen(l, controlListener net.Listener, err error) {
os.Setenv("TYK_SERVICE_NODEID", "")
}

// Resume accepting connections in a new goroutine.
if !rpcEmergencyMode {
count := syncAPISpecs()
if count > 0 {
loadGlobalApps(mainRouter)
syncPolicies()
}

if config.Global.ControlAPIPort > 0 {
loadAPIEndpoints(controlRouter)
}

if config.Global.UseDBAppConfigs {
go DashService.StartBeating()
}
if config.Global.UseDBAppConfigs {
go DashService.StartBeating()
}

if config.Global.HttpServerOptions.OverrideDefaults {
Expand Down Expand Up @@ -1470,14 +1448,12 @@ func listen(l, controlListener net.Listener, err error) {

go http.Serve(l, mainHandler{})

if !rpcEmergencyMode {
if controlListener != nil {
log.WithFields(logrus.Fields{
"prefix": "main",
}).Info("Control API listener started: ", controlListener, controlRouter)
if controlListener != nil {
log.WithFields(logrus.Fields{
"prefix": "main",
}).Info("Control API listener started: ", controlListener, controlRouter)

go http.Serve(controlListener, controlRouter)
}
go http.Serve(controlListener, controlRouter)
}
}

Expand Down
Loading

0 comments on commit 6e39a5e

Please sign in to comment.