Skip to content

Commit

Permalink
storage: new package, with the interface and redis
Browse files Browse the repository at this point in the history
Move storage_handlers.go and redis_cluster_handler.go into this package.
We will keep anything related to storage decoupled there.

We can't put rpc_storage_handler.go there yet, as it pulls in other
parts of the main package and needs some further work. For example:

	./rpc_storage_handler.go:241:14: undefined: LoadDefinitionsFromRPCBackup
	./rpc_storage_handler.go:246:4: undefined: doLoadWithBackup
	./rpc_storage_handler.go:657:4: undefined: MainNotifier
	./rpc_storage_handler.go:657:24: undefined: Notification
	./rpc_storage_handler.go:718:5: undefined: handleDeleteHashedKey
	./rpc_storage_handler.go:722:4: undefined: handleDeleteKey

For now, this is better than nothing and it will help us to further pick
the main package into pieces.

Only a couple of package members needed exporting - the former doHash
and errKeyNotFound names.
  • Loading branch information
mvdan authored and buger committed Oct 6, 2017
1 parent 51b05b9 commit add9130
Show file tree
Hide file tree
Showing 25 changed files with 139 additions and 113 deletions.
5 changes: 3 additions & 2 deletions analytics.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"gopkg.in/vmihailenco/msgpack.v2"

"github.com/TykTechnologies/tyk/config"
"github.com/TykTechnologies/tyk/storage"
)

// AnalyticsRecord encodes the details of a request
Expand Down Expand Up @@ -144,7 +145,7 @@ func (a *AnalyticsRecord) SetExpiry(expiresInSeconds int64) {
// RedisAnalyticsHandler will record analytics data to a redis back end
// as defined in the Config object
type RedisAnalyticsHandler struct {
Store StorageHandler
Store storage.Handler
Clean Purger
GeoIPDB *maxminddb.Reader

Expand Down Expand Up @@ -180,7 +181,7 @@ func (r *RedisAnalyticsHandler) RecordHit(record AnalyticsRecord) error {

r.AnalyticsPool.SendWork(func() {
// If we are obfuscating API Keys, store the hashed representation (config check handled in hashing function)
record.APIKey = hashKey(record.APIKey)
record.APIKey = storage.HashKey(record.APIKey)

if config.Global.SlaveOptions.UseRPC {
// Extend tag list to include this data so wecan segment by node if necessary
Expand Down
5 changes: 3 additions & 2 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/TykTechnologies/tyk/apidef"
"github.com/TykTechnologies/tyk/config"
"github.com/TykTechnologies/tyk/storage"
)

// APIModifyKeySuccess represents when a Key modification was successful
Expand Down Expand Up @@ -730,7 +731,7 @@ func handleOrgAddOrUpdate(keyName string, r *http.Request) (interface{}, int) {
if r.URL.Query().Get("reset_quota") == "1" {
sessionManager.ResetQuota(keyName, newSession)
newSession.QuotaRenews = time.Now().Unix() + newSession.QuotaRenewalRate
rawKey := QuotaKeyPrefix + hashKey(keyName)
rawKey := QuotaKeyPrefix + storage.HashKey(keyName)

// manage quotas separately
DefaultQuotaStore.RemoveSession(rawKey)
Expand Down Expand Up @@ -1339,7 +1340,7 @@ func invalidateCacheHandler(w http.ResponseWriter, r *http.Request) {

keyPrefix := "cache-" + apiID
matchPattern := keyPrefix + "*"
store := &RedisClusterStorageManager{KeyPrefix: keyPrefix, IsCache: true}
store := &storage.RedisCluster{KeyPrefix: keyPrefix, IsCache: true}

if ok := store.DeleteScanMatch(matchPattern); !ok {
err := errors.New("scan/delete failed")
Expand Down
3 changes: 2 additions & 1 deletion api_definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/TykTechnologies/tyk/apidef"
"github.com/TykTechnologies/tyk/config"
"github.com/TykTechnologies/tyk/storage"
)

const (
Expand Down Expand Up @@ -749,7 +750,7 @@ func (a APIDefinitionLoader) getExtendedPathSpecs(apiVersionDef apidef.VersionIn
return combinedPath, len(whiteListPaths) > 0
}

func (a *APISpec) Init(authStore, sessionStore, healthStore, orgStore StorageHandler) {
func (a *APISpec) Init(authStore, sessionStore, healthStore, orgStore storage.Handler) {
a.AuthManager.Init(authStore)
a.SessionManager.Init(sessionStore)
a.Health.Init(healthStore)
Expand Down
7 changes: 4 additions & 3 deletions api_healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/TykTechnologies/tyk/config"
"github.com/TykTechnologies/tyk/storage"
)

type HealthPrefix string
Expand All @@ -20,7 +21,7 @@ const (
)

type HealthChecker interface {
Init(StorageHandler)
Init(storage.Handler)
ApiHealthValues() (HealthCheckValues, error)
StoreCounterVal(HealthPrefix, string)
}
Expand All @@ -34,13 +35,13 @@ type HealthCheckValues struct {
}

type DefaultHealthChecker struct {
storage StorageHandler
storage storage.Handler
APIID string
}

var healthWarn sync.Once

func (h *DefaultHealthChecker) Init(storeType StorageHandler) {
func (h *DefaultHealthChecker) Init(storeType storage.Handler) {
if config.Global.HealthCheck.EnableHealthChecks {
log.Debug("Health Checker initialised.")
healthWarn.Do(func() {
Expand Down
13 changes: 7 additions & 6 deletions api_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/TykTechnologies/tyk/apidef"
"github.com/TykTechnologies/tyk/config"
"github.com/TykTechnologies/tyk/coprocess"
"github.com/TykTechnologies/tyk/storage"
)

type ChainObject struct {
Expand All @@ -27,10 +28,10 @@ type ChainObject struct {
Subrouter *mux.Router
}

func prepareStorage() (*RedisClusterStorageManager, *RedisClusterStorageManager, *RedisClusterStorageManager, *RPCStorageHandler, *RPCStorageHandler) {
redisStore := RedisClusterStorageManager{KeyPrefix: "apikey-", HashKeys: config.Global.HashKeys}
redisOrgStore := RedisClusterStorageManager{KeyPrefix: "orgkey."}
healthStore := &RedisClusterStorageManager{KeyPrefix: "apihealth."}
func prepareStorage() (*storage.RedisCluster, *storage.RedisCluster, *storage.RedisCluster, *RPCStorageHandler, *RPCStorageHandler) {
redisStore := storage.RedisCluster{KeyPrefix: "apikey-", HashKeys: config.Global.HashKeys}
redisOrgStore := storage.RedisCluster{KeyPrefix: "orgkey."}
healthStore := &storage.RedisCluster{KeyPrefix: "apihealth."}
rpcAuthStore := RPCStorageHandler{KeyPrefix: "apikey-", HashKeys: config.Global.HashKeys, UserKey: config.Global.SlaveOptions.APIKey, Address: config.Global.SlaveOptions.ConnectionString}
rpcOrgStore := RPCStorageHandler{KeyPrefix: "orgkey.", UserKey: config.Global.SlaveOptions.APIKey, Address: config.Global.SlaveOptions.ConnectionString}

Expand Down Expand Up @@ -98,7 +99,7 @@ func countApisByListenHash(specs []*APISpec) map[string]int {
}

func processSpec(spec *APISpec, apisByListen map[string]int,
redisStore, redisOrgStore, healthStore, rpcAuthStore, rpcOrgStore StorageHandler,
redisStore, redisOrgStore, healthStore, rpcAuthStore, rpcOrgStore storage.Handler,
subrouter *mux.Router) *ChainObject {

var chainDef ChainObject
Expand Down Expand Up @@ -260,7 +261,7 @@ func processSpec(spec *APISpec, apisByListen map[string]int,
}

keyPrefix := "cache-" + spec.APIID
cacheStore := &RedisClusterStorageManager{KeyPrefix: keyPrefix, IsCache: true}
cacheStore := &storage.RedisCluster{KeyPrefix: keyPrefix, IsCache: true}
cacheStore.Connect()

var chain http.Handler
Expand Down
3 changes: 2 additions & 1 deletion api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/TykTechnologies/tyk/apidef"
"github.com/TykTechnologies/tyk/config"
"github.com/TykTechnologies/tyk/storage"
)

const apiTestDef = `{
Expand Down Expand Up @@ -541,7 +542,7 @@ func TestResetHandlerBlock(t *testing.T) {
func TestGroupResetHandler(t *testing.T) {
didSubscribe := make(chan bool)
didReload := make(chan bool)
cacheStore := RedisClusterStorageManager{}
cacheStore := storage.RedisCluster{}
cacheStore.Connect()

go func() {
Expand Down
23 changes: 12 additions & 11 deletions auth_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/satori/go.uuid"

"github.com/TykTechnologies/tyk/config"
"github.com/TykTechnologies/tyk/storage"

"github.com/Sirupsen/logrus"
)
Expand All @@ -18,34 +19,34 @@ import (
// is valid in any way (e.g. cryptographic signing etc.). Returns
// a SessionState object (deserialised JSON)
type AuthorisationHandler interface {
Init(StorageHandler)
Init(storage.Handler)
KeyAuthorised(string) (SessionState, bool)
KeyExpired(*SessionState) bool
}

// SessionHandler handles all update/create/access session functions and deals exclusively with
// SessionState objects, not identity
type SessionHandler interface {
Init(store StorageHandler)
Init(store storage.Handler)
UpdateSession(keyName string, session *SessionState, resetTTLTo int64) error
RemoveSession(keyName string)
SessionDetail(keyName string) (SessionState, bool)
Sessions(filter string) []string
Store() StorageHandler
Store() storage.Handler
ResetQuota(string, *SessionState)
}

// DefaultAuthorisationManager implements AuthorisationHandler,
// requires a StorageHandler to interact with key store
// requires a storage.Handler to interact with key store
type DefaultAuthorisationManager struct {
store StorageHandler
store storage.Handler
}

type DefaultSessionManager struct {
store StorageHandler
store storage.Handler
}

func (b *DefaultAuthorisationManager) Init(store StorageHandler) {
func (b *DefaultAuthorisationManager) Init(store storage.Handler) {
b.store = store
b.store.Connect()
}
Expand Down Expand Up @@ -80,25 +81,25 @@ func (b *DefaultAuthorisationManager) KeyExpired(newSession *SessionState) bool
return false
}

func (b *DefaultSessionManager) Init(store StorageHandler) {
func (b *DefaultSessionManager) Init(store storage.Handler) {
b.store = store
b.store.Connect()
}

func (b *DefaultSessionManager) Store() StorageHandler {
func (b *DefaultSessionManager) Store() storage.Handler {
return b.store
}

func (b *DefaultSessionManager) ResetQuota(keyName string, session *SessionState) {

rawKey := QuotaKeyPrefix + hashKey(keyName)
rawKey := QuotaKeyPrefix + storage.HashKey(keyName)
log.WithFields(logrus.Fields{
"prefix": "auth-mgr",
"inbound-key": ObfuscateKeyString(keyName),
"key": rawKey,
}).Info("Reset quota for key.")

rateLimiterSentinelKey := RateLimitKeyPrefix + hashKey(keyName) + ".BLOCKED"
rateLimiterSentinelKey := RateLimitKeyPrefix + storage.HashKey(keyName) + ".BLOCKED"
// Clear the rate limiter
go b.store.DeleteRawKey(rateLimiterSentinelKey)
// Fix the raw key
Expand Down
5 changes: 3 additions & 2 deletions coprocess_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/Sirupsen/logrus"

"github.com/TykTechnologies/tyk/apidef"
"github.com/TykTechnologies/tyk/storage"
)

// CoProcessDefaultKeyPrefix is used as a key prefix for this CP.
Expand All @@ -33,7 +34,7 @@ func TykStoreData(CKey, CValue *C.char, CTTL C.int) {
value := C.GoString(CValue)
ttl := int64(CTTL)

store := &RedisClusterStorageManager{KeyPrefix: CoProcessDefaultKeyPrefix}
store := &storage.RedisCluster{KeyPrefix: CoProcessDefaultKeyPrefix}
store.SetKey(key, value, ttl)
}

Expand All @@ -42,7 +43,7 @@ func TykStoreData(CKey, CValue *C.char, CTTL C.int) {
func TykGetData(CKey *C.char) *C.char {
key := C.GoString(CKey)

store := &RedisClusterStorageManager{KeyPrefix: CoProcessDefaultKeyPrefix}
store := &storage.RedisCluster{KeyPrefix: CoProcessDefaultKeyPrefix}
// TODO: return error
val, _ := store.GetKey(key)
return C.CString(val)
Expand Down
5 changes: 3 additions & 2 deletions event_handler_webhooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/TykTechnologies/tyk/apidef"
"github.com/TykTechnologies/tyk/config"
"github.com/TykTechnologies/tyk/storage"
)

type WebHookRequestMethod string
Expand All @@ -35,7 +36,7 @@ const (
type WebHookHandler struct {
conf config.WebHookHandlerConf
template *template.Template // non-nil if Init is run without error
store StorageHandler
store storage.Handler
}

// createConfigObject by default tyk will provide a ma[string]interface{} type as a conf, converting it
Expand Down Expand Up @@ -65,7 +66,7 @@ func (w *WebHookHandler) Init(handlerConf interface{}) error {
return err
}

w.store = &RedisClusterStorageManager{KeyPrefix: "webhook.cache."}
w.store = &storage.RedisCluster{KeyPrefix: "webhook.cache."}
w.store.Connect()

// Pre-load template on init
Expand Down
7 changes: 4 additions & 3 deletions gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/TykTechnologies/tyk/apidef"
"github.com/TykTechnologies/tyk/config"
"github.com/TykTechnologies/tyk/storage"
)

func init() {
Expand Down Expand Up @@ -517,9 +518,9 @@ const extendedPathGatewaySetup = `{
func createSpecTest(t *testing.T, def string) *APISpec {
spec := createDefinitionFromString(def)
tname := t.Name()
redisStore := &RedisClusterStorageManager{KeyPrefix: tname + "-apikey."}
healthStore := &RedisClusterStorageManager{KeyPrefix: tname + "-apihealth."}
orgStore := &RedisClusterStorageManager{KeyPrefix: tname + "-orgKey."}
redisStore := &storage.RedisCluster{KeyPrefix: tname + "-apikey."}
healthStore := &storage.RedisCluster{KeyPrefix: tname + "-apihealth."}
orgStore := &storage.RedisCluster{KeyPrefix: tname + "-orgKey."}
spec.Init(redisStore, redisStore, healthStore, orgStore)
return spec
}
Expand Down
7 changes: 4 additions & 3 deletions host_checker_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@ import (

"github.com/TykTechnologies/tyk/apidef"
"github.com/TykTechnologies/tyk/config"
"github.com/TykTechnologies/tyk/storage"
)

var GlobalHostChecker HostCheckerManager

type HostCheckerManager struct {
Id string
store StorageHandler
store storage.Handler
checkerMu sync.Mutex
checker *HostUptimeChecker
stopLoop bool
Expand Down Expand Up @@ -70,7 +71,7 @@ const (
UptimeAnalytics_KEYNAME = "tyk-uptime-analytics"
)

func (hc *HostCheckerManager) Init(store StorageHandler) {
func (hc *HostCheckerManager) Init(store storage.Handler) {
hc.store = store
hc.unhealthyHostList = make(map[string]bool)
hc.resetsInitiated = make(map[string]bool)
Expand Down Expand Up @@ -493,7 +494,7 @@ func (hc *HostCheckerManager) RecordUptimeAnalytics(report HostHealthReport) err
return nil
}

func InitHostCheckManager(store StorageHandler) {
func InitHostCheckManager(store storage.Handler) {
// Already initialized
if GlobalHostChecker.Id != "" {
return
Expand Down
3 changes: 2 additions & 1 deletion host_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/TykTechnologies/tyk/apidef"
"github.com/TykTechnologies/tyk/config"
"github.com/TykTechnologies/tyk/storage"
)

const sampleUptimeTestAPI = `{
Expand Down Expand Up @@ -147,7 +148,7 @@ func TestHostChecker(t *testing.T) {
t.Error("Should set defaults", GlobalHostChecker.checker.checkTimeout)
}

redisStore := GlobalHostChecker.store.(*RedisClusterStorageManager)
redisStore := GlobalHostChecker.store.(*storage.RedisCluster)
if ttl, _ := redisStore.GetKeyTTL(PoolerHostSentinelKeyPrefix + testHttpFailure); int(ttl) != GlobalHostChecker.checker.checkTimeout+1 {
t.Error("HostDown expiration key should be checkTimeout + 1", ttl)
}
Expand Down
2 changes: 1 addition & 1 deletion ldap_auth_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
ldap "github.com/mavricknz/ldap"
)

// LDAPStorageHandler implements StorageHandler, this is a read-only implementation to access keys from an LDAP service
// LDAPStorageHandler implements storage.Handler, this is a read-only implementation to access keys from an LDAP service
type LDAPStorageHandler struct {
LDAPServer string
LDAPPort uint16
Expand Down
5 changes: 3 additions & 2 deletions le_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/Sirupsen/logrus"

"github.com/TykTechnologies/tyk/config"
"github.com/TykTechnologies/tyk/storage"
)

const LEKeyPrefix = "le_ssl:"
Expand All @@ -17,7 +18,7 @@ func StoreLEState(m *letsencrypt.Manager) {

log.Debug("[SSL] --> Connecting to DB")

store := &RedisClusterStorageManager{KeyPrefix: LEKeyPrefix}
store := &storage.RedisCluster{KeyPrefix: LEKeyPrefix}
connected := store.Connect()

log.Debug("--> Connected to DB")
Expand All @@ -40,7 +41,7 @@ func StoreLEState(m *letsencrypt.Manager) {
func GetLEState(m *letsencrypt.Manager) {
checkKey := "cache"

store := &RedisClusterStorageManager{KeyPrefix: LEKeyPrefix}
store := &storage.RedisCluster{KeyPrefix: LEKeyPrefix}

connected := store.Connect()
log.Debug("[SSL] --> Connected to DB")
Expand Down
Loading

0 comments on commit add9130

Please sign in to comment.