Skip to content

Commit

Permalink
Removed internal refs
Browse files Browse the repository at this point in the history
  • Loading branch information
lonelycode committed Dec 3, 2016
1 parent 9658c8b commit d16cb2b
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 38 deletions.
66 changes: 29 additions & 37 deletions rpc_storage_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func ClearRPCClients() {

func RPCKeepAliveCheck(r *RPCStorageHandler) {
// Only run when connected
if RPCClientIsConnected && r.cache != nil {
if RPCClientIsConnected {
// Make sure the auth back end is still alive
c1 := make(chan string, 1)

Expand Down Expand Up @@ -108,13 +108,10 @@ func RPCKeepAliveCheck(r *RPCStorageHandler) {

// RPCStorageHandler is a storage manager that uses the redis database.
type RPCStorageHandler struct {
RPCClient *gorpc.Client
Client *gorpc.DispatcherClient
KeyPrefix string
HashKeys bool
UserKey string
Address string
cache *cache.Cache
killChan chan int
Killed bool
Connected bool
Expand Down Expand Up @@ -164,32 +161,27 @@ func (r *RPCStorageHandler) Connect() bool {

if RPCClientIsConnected {
log.Debug("Using RPC singleton for connection")
r.RPCClient = RPCCLientSingleton
r.Client = RPCFuncClientSingleton
r.cache = RPCGlobalCache
return true
}

// RPC Client is unset
// Set up the cache
log.Info("Setting new RPC connection!")
r.cache = RPCGlobalCache
RPCCLientSingleton = gorpc.NewTCPClient(r.Address)
r.RPCClient = RPCCLientSingleton

if log.Level != logrus.DebugLevel {
gorpc.SetErrorLogger(gorpc.NilErrorLogger)
}

r.RPCClient.OnConnect = r.OnConnectFunc
r.RPCClient.Conns = 50
r.RPCClient.Start()
RPCCLientSingleton.OnConnect = r.OnConnectFunc
RPCCLientSingleton.Conns = 50
RPCCLientSingleton.Start()
d := GetDispatcher()

if RPCFuncClientSingleton == nil {
RPCFuncClientSingleton = d.NewFuncClient(r.RPCClient)
RPCFuncClientSingleton = d.NewFuncClient(RPCCLientSingleton)
}
r.Client = RPCFuncClientSingleton

r.Login()

if !r.SuppressRegister {
Expand All @@ -207,7 +199,7 @@ func (r *RPCStorageHandler) OnConnectFunc(remoteAddr string, rwc io.ReadWriteClo

func (r *RPCStorageHandler) Disconnect() bool {
if RPCClientIsConnected {
go r.RPCClient.Stop()
go RPCCLientSingleton.Stop()
RPCClientIsConnected = false
RPCCLientRWMutex.Lock()
delete(RPCClients, r.ID)
Expand Down Expand Up @@ -268,7 +260,7 @@ func (r *RPCStorageHandler) GroupLogin() {
UserKey: r.UserKey,
GroupID: config.SlaveOptions.GroupID,
}
ok, err := r.Client.CallTimeout("LoginWithGroup", groupLoginData, GlobalRPCCallTimeout)
ok, err := RPCFuncClientSingleton.CallTimeout("LoginWithGroup", groupLoginData, GlobalRPCCallTimeout)
if err != nil {
log.Error("RPC Login failed: ", err)
r.ReAttemptLogin(err)
Expand Down Expand Up @@ -297,7 +289,7 @@ func (r *RPCStorageHandler) Login() {
return
}

ok, err := r.Client.CallTimeout("Login", r.UserKey, GlobalRPCCallTimeout)
ok, err := RPCFuncClientSingleton.CallTimeout("Login", r.UserKey, GlobalRPCCallTimeout)
if err != nil {
log.Error("RPC Login failed: ", err)
r.ReAttemptLogin(err)
Expand All @@ -322,7 +314,7 @@ func (r *RPCStorageHandler) GetKey(keyName string) (string, error) {
// Check the cache first
if config.SlaveOptions.EnableRPCCache {
log.Debug("Using cache for: ", keyName)
cachedVal, found := r.cache.Get(r.fixKey(keyName))
cachedVal, found := RPCGlobalCache.Get(r.fixKey(keyName))
log.Debug("--> Found? ", found)
if found {
elapsed := time.Since(start)
Expand All @@ -333,7 +325,7 @@ func (r *RPCStorageHandler) GetKey(keyName string) (string, error) {
}

// Not cached
value, err := r.Client.CallTimeout("GetKey", r.fixKey(keyName), GlobalRPCCallTimeout)
value, err := RPCFuncClientSingleton.CallTimeout("GetKey", r.fixKey(keyName), GlobalRPCCallTimeout)

if err != nil {
if r.IsAccessError(err) {
Expand All @@ -349,7 +341,7 @@ func (r *RPCStorageHandler) GetKey(keyName string) (string, error) {

if config.SlaveOptions.EnableRPCCache {
// Cache it
r.cache.Set(r.fixKey(keyName), value, cache.DefaultExpiration)
RPCGlobalCache.Set(r.fixKey(keyName), value, cache.DefaultExpiration)
}

return value.(string), nil
Expand All @@ -363,7 +355,7 @@ func (r *RPCStorageHandler) GetRawKey(keyName string) (string, error) {

func (r *RPCStorageHandler) GetExp(keyName string) (int64, error) {
log.Debug("GetExp called")
value, err := r.Client.CallTimeout("GetExp", r.fixKey(keyName), GlobalRPCCallTimeout)
value, err := RPCFuncClientSingleton.CallTimeout("GetExp", r.fixKey(keyName), GlobalRPCCallTimeout)

if err != nil {
if r.IsAccessError(err) {
Expand All @@ -387,7 +379,7 @@ func (r *RPCStorageHandler) SetKey(keyName string, sessionState string, timeout
Timeout: timeout,
}

_, err := r.Client.CallTimeout("SetKey", ibd, GlobalRPCCallTimeout)
_, err := RPCFuncClientSingleton.CallTimeout("SetKey", ibd, GlobalRPCCallTimeout)

if r.IsAccessError(err) {
r.Login()
Expand All @@ -407,7 +399,7 @@ func (r *RPCStorageHandler) SetRawKey(keyName string, sessionState string, timeo
// Decrement will decrement a key in redis
func (r *RPCStorageHandler) Decrement(keyName string) {
log.Warning("Decrement called")
_, err := r.Client.CallTimeout("Decrement", keyName, GlobalRPCCallTimeout)
_, err := RPCFuncClientSingleton.CallTimeout("Decrement", keyName, GlobalRPCCallTimeout)
if r.IsAccessError(err) {
r.Login()
r.Decrement(keyName)
Expand All @@ -423,7 +415,7 @@ func (r *RPCStorageHandler) IncrememntWithExpire(keyName string, expire int64) i
Expire: expire,
}

val, err := r.Client.CallTimeout("IncrememntWithExpire", ibd, GlobalRPCCallTimeout)
val, err := RPCFuncClientSingleton.CallTimeout("IncrememntWithExpire", ibd, GlobalRPCCallTimeout)

if r.IsAccessError(err) {
r.Login()
Expand Down Expand Up @@ -453,7 +445,7 @@ func (r *RPCStorageHandler) GetKeysAndValuesWithFilter(filter string) map[string
searchStr := r.KeyPrefix + r.hashKey(filter) + "*"
log.Debug("[STORE] Getting list by: ", searchStr)

kvPair, err := r.Client.CallTimeout("GetKeysAndValuesWithFilter", searchStr, GlobalRPCCallTimeout)
kvPair, err := RPCFuncClientSingleton.CallTimeout("GetKeysAndValuesWithFilter", searchStr, GlobalRPCCallTimeout)

if r.IsAccessError(err) {
r.Login()
Expand All @@ -473,7 +465,7 @@ func (r *RPCStorageHandler) GetKeysAndValuesWithFilter(filter string) map[string
func (r *RPCStorageHandler) GetKeysAndValues() map[string]string {

searchStr := r.KeyPrefix + "*"
kvPair, err := r.Client.CallTimeout("GetKeysAndValues", searchStr, GlobalRPCCallTimeout)
kvPair, err := RPCFuncClientSingleton.CallTimeout("GetKeysAndValues", searchStr, GlobalRPCCallTimeout)

if r.IsAccessError(err) {
r.Login()
Expand All @@ -494,7 +486,7 @@ func (r *RPCStorageHandler) DeleteKey(keyName string) bool {

log.Debug("DEL Key was: ", keyName)
log.Debug("DEL Key became: ", r.fixKey(keyName))
ok, err := r.Client.CallTimeout("DeleteKey", r.fixKey(keyName), GlobalRPCCallTimeout)
ok, err := RPCFuncClientSingleton.CallTimeout("DeleteKey", r.fixKey(keyName), GlobalRPCCallTimeout)

if r.IsAccessError(err) {
r.Login()
Expand All @@ -509,7 +501,7 @@ func (r *RPCStorageHandler) DeleteKey(keyName string) bool {

// DeleteKey will remove a key from the database without prefixing, assumes user knows what they are doing
func (r *RPCStorageHandler) DeleteRawKey(keyName string) bool {
ok, err := r.Client.CallTimeout("DeleteRawKey", keyName, GlobalRPCCallTimeout)
ok, err := RPCFuncClientSingleton.CallTimeout("DeleteRawKey", keyName, GlobalRPCCallTimeout)

if r.IsAccessError(err) {
r.Login()
Expand All @@ -531,7 +523,7 @@ func (r *RPCStorageHandler) DeleteKeys(keys []string) bool {
}

log.Debug("Deleting: ", asInterface)
ok, err := r.Client.CallTimeout("DeleteKeys", asInterface, GlobalRPCCallTimeout)
ok, err := RPCFuncClientSingleton.CallTimeout("DeleteKeys", asInterface, GlobalRPCCallTimeout)

if r.IsAccessError(err) {
r.Login()
Expand Down Expand Up @@ -581,7 +573,7 @@ func (r *RPCStorageHandler) AppendToSet(keyName string, value string) {
Value: value,
}

_, err := r.Client.CallTimeout("AppendToSet", ibd, GlobalRPCCallTimeout)
_, err := RPCFuncClientSingleton.CallTimeout("AppendToSet", ibd, GlobalRPCCallTimeout)
if r.IsAccessError(err) {
r.Login()
r.AppendToSet(keyName, value)
Expand All @@ -599,7 +591,7 @@ func (r *RPCStorageHandler) SetRollingWindow(keyName string, per int64, val stri
Expire: -1,
}

intVal, err := r.Client.CallTimeout("SetRollingWindow", ibd, GlobalRPCCallTimeout)
intVal, err := RPCFuncClientSingleton.CallTimeout("SetRollingWindow", ibd, GlobalRPCCallTimeout)
if r.IsAccessError(err) {
r.Login()
return r.SetRollingWindow(keyName, per, val)
Expand All @@ -626,7 +618,7 @@ func (r *RPCStorageHandler) SetRollingWindowPipeline(keyName string, per int64,
Expire: -1,
}

intVal, err := r.Client.CallTimeout("SetRollingWindow", ibd, GlobalRPCCallTimeout)
intVal, err := RPCFuncClientSingleton.CallTimeout("SetRollingWindow", ibd, GlobalRPCCallTimeout)
if r.IsAccessError(err) {
r.Login()
return r.SetRollingWindow(keyName, per, val)
Expand Down Expand Up @@ -674,7 +666,7 @@ func (r *RPCStorageHandler) GetApiDefinitions(orgId string, tags []string) strin
Tags: tags,
}

defString, err := r.Client.CallTimeout("GetApiDefinitions", dr, GlobalRPCCallTimeout)
defString, err := RPCFuncClientSingleton.CallTimeout("GetApiDefinitions", dr, GlobalRPCCallTimeout)

if err != nil {
if r.IsAccessError(err) {
Expand All @@ -695,7 +687,7 @@ func (r *RPCStorageHandler) GetApiDefinitions(orgId string, tags []string) strin

// GetPolicies will pull Policies from the RPC server
func (r *RPCStorageHandler) GetPolicies(orgId string) string {
defString, err := r.Client.CallTimeout("GetPolicies", orgId, GlobalRPCCallTimeout)
defString, err := RPCFuncClientSingleton.CallTimeout("GetPolicies", orgId, GlobalRPCCallTimeout)
if err != nil {
if r.IsAccessError(err) {
r.Login()
Expand All @@ -714,7 +706,7 @@ func (r *RPCStorageHandler) GetPolicies(orgId string) string {
// CheckForReload will start a long poll
func (r *RPCStorageHandler) CheckForReload(orgId string) {
log.Debug("[RPC STORE] Check Reload called...")
reload, err := r.Client.CallTimeout("CheckReload", orgId, time.Second*60)
reload, err := RPCFuncClientSingleton.CallTimeout("CheckReload", orgId, time.Second*60)
if err != nil {
if r.IsAccessError(err) {
log.Warning("[RPC STORE] CheckReload: Not logged in")
Expand Down Expand Up @@ -756,14 +748,14 @@ func (r *RPCStorageHandler) CheckForKeyspaceChanges(orgId string) {
var err error

if config.SlaveOptions.GroupID == "" {
keys, err = r.Client.CallTimeout("GetKeySpaceUpdate", orgId, GlobalRPCCallTimeout)
keys, err = RPCFuncClientSingleton.CallTimeout("GetKeySpaceUpdate", orgId, GlobalRPCCallTimeout)
} else {

grpReq := GroupKeySpaceRequest{
OrgID: orgId,
GroupID: config.SlaveOptions.GroupID,
}
keys, err = r.Client.CallTimeout("GetGroupKeySpaceUpdate", grpReq, GlobalRPCCallTimeout)
keys, err = RPCFuncClientSingleton.CallTimeout("GetGroupKeySpaceUpdate", grpReq, GlobalRPCCallTimeout)
}

if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion version.go
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
package main
var VERSION string = "v2.3.0.26"
var VERSION string = "v2.3.0.27"

0 comments on commit d16cb2b

Please sign in to comment.