Skip to content

Commit

Permalink
Cluster: redirection refactoring + handling of blocked clients.
Browse files Browse the repository at this point in the history
There was a bug in Redis Cluster caused by clients blocked in a blocking
list pop operation, for keys no longer handled by the instance, or
in a condition where the cluster became down after the client blocked.

A typical situation is:

1) BLPOP <somekey> 0
2) <somekey> hash slot is resharded to another master.

The client will block forever int this case.

A symmentrical non-cluster-specific bug happens when an instance is
turned from master to slave. In that case it is more serious since this
will desynchronize data between slaves and masters. This other bug was
discovered as a side effect of thinking about the bug explained and
fixed in this commit, but will be fixed in a separated commit.
  • Loading branch information
antirez committed Mar 24, 2015
1 parent eff68bd commit 9b7f8b1
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 29 deletions.
2 changes: 2 additions & 0 deletions src/blocked.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
* When implementing a new type of blocking opeation, the implementation
* should modify unblockClient() and replyToBlockedClientTimedOut() in order
* to handle the btype-specific behavior of this two functions.
* If the blocking operation waits for certain keys to change state, the
* clusterRedirectBlockedClientIfNeeded() function should also be updated.
*/

#include "redis.h"
Expand Down
90 changes: 85 additions & 5 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -4768,10 +4768,10 @@ void readwriteCommand(redisClient *c) {
* belonging to the same slot, but the slot is not stable (in migration or
* importing state, likely because a resharding is in progress).
*
* REDIS_CLUSTER_REDIR_DOWN if the request addresses a slot which is not
* bound to any node. In this case the cluster global state should be already
* "down" but it is fragile to rely on the update of the global state, so
* we also handle it here. */
* REDIS_CLUSTER_REDIR_DOWN_UNBOUND if the request addresses a slot which is
* not bound to any node. In this case the cluster global state should be
* already "down" but it is fragile to rely on the update of the global state,
* so we also handle it here. */
clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code) {
clusterNode *n = NULL;
robj *firstkey = NULL;
Expand Down Expand Up @@ -4833,7 +4833,7 @@ clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **arg
if (n == NULL) {
getKeysFreeResult(keyindex);
if (error_code)
*error_code = REDIS_CLUSTER_REDIR_DOWN;
*error_code = REDIS_CLUSTER_REDIR_DOWN_UNBOUND;
return NULL;
}

Expand Down Expand Up @@ -4925,3 +4925,83 @@ clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **arg
if (n != myself && error_code) *error_code = REDIS_CLUSTER_REDIR_MOVED;
return n;
}

/* Send the client the right redirection code, according to error_code
* that should be set to one of REDIS_CLUSTER_REDIR_* macros.
*
* If REDIS_CLUSTER_REDIR_ASK or REDIS_CLUSTER_REDIR_MOVED error codes
* are used, then the node 'n' should not be NULL, but should be the
* node we want to mention in the redirection. Moreover hashslot should
* be set to the hash slot that caused the redirection. */
void clusterRedirectClient(redisClient *c, clusterNode *n, int hashslot, int error_code) {
if (error_code == REDIS_CLUSTER_REDIR_CROSS_SLOT) {
addReplySds(c,sdsnew("-CROSSSLOT Keys in request don't hash to the same slot\r\n"));
} else if (error_code == REDIS_CLUSTER_REDIR_UNSTABLE) {
/* The request spawns mutliple keys in the same slot,
* but the slot is not "stable" currently as there is
* a migration or import in progress. */
addReplySds(c,sdsnew("-TRYAGAIN Multiple keys request during rehashing of slot\r\n"));
} else if (error_code == REDIS_CLUSTER_REDIR_DOWN_STATE) {
addReplySds(c,sdsnew("-CLUSTERDOWN The cluster is down\r\n"));
} else if (error_code == REDIS_CLUSTER_REDIR_DOWN_UNBOUND) {
addReplySds(c,sdsnew("-CLUSTERDOWN Hash slot not served\r\n"));
} else if (error_code == REDIS_CLUSTER_REDIR_MOVED ||
error_code == REDIS_CLUSTER_REDIR_ASK)
{
addReplySds(c,sdscatprintf(sdsempty(),
"-%s %d %s:%d\r\n",
(error_code == REDIS_CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
hashslot,n->ip,n->port));
} else {
redisPanic("getNodeByQuery() unknown error.");
}
}

/* This function is called by the function processing clients incrementally
* to detect timeouts, in order to handle the following case:
*
* 1) A client blocks with BLPOP or similar blocking operation.
* 2) The master migrates the hash slot elsewhere or turns into a slave.
* 3) The client may remain blocked forever (or up to the max timeout time)
* waiting for a key change that will never happen.
*
* If the client is found to be blocked into an hash slot this node no
* longer handles, the client is sent a redirection error, and the function
* returns 1. Otherwise 0 is returned and no operation is performed. */
int clusterRedirectBlockedClientIfNeeded(redisClient *c) {
if (c->flags & REDIS_BLOCKED && c->btype == REDIS_BLOCKED_LIST) {
dictEntry *de;
dictIterator *di;

/* If the cluster is down, unblock the client with the right error. */
if (server.cluster->state == REDIS_CLUSTER_FAIL) {
clusterRedirectClient(c,NULL,0,REDIS_CLUSTER_REDIR_DOWN_STATE);
return 1;
}

di = dictGetIterator(c->bpop.keys);
while((de = dictNext(di)) != NULL) {
robj *key = dictGetKey(de);
int slot = keyHashSlot((char*)key->ptr, sdslen(key->ptr));
clusterNode *node = server.cluster->slots[slot];

/* We send an error and unblock the client if:
* 1) The slot is unassigned, emitting a cluster down error.
* 2) The slot is not handled by this node, nor being imported. */
if (node != myself &&
server.cluster->importing_slots_from[slot] == NULL)
{
if (node == NULL) {
clusterRedirectClient(c,NULL,0,
REDIS_CLUSTER_REDIR_DOWN_UNBOUND);
} else {
clusterRedirectClient(c,node,slot,
REDIS_CLUSTER_REDIR_MOVED);
}
return 1;
}
}
dictReleaseIterator(di);
}
return 0;
}
9 changes: 6 additions & 3 deletions src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@

/* Redirection errors returned by getNodeByQuery(). */
#define REDIS_CLUSTER_REDIR_NONE 0 /* Node can serve the request. */
#define REDIS_CLUSTER_REDIR_CROSS_SLOT 1 /* Keys in different slots. */
#define REDIS_CLUSTER_REDIR_UNSTABLE 2 /* Keys in slot resharding. */
#define REDIS_CLUSTER_REDIR_CROSS_SLOT 1 /* -CROSSSLOT request. */
#define REDIS_CLUSTER_REDIR_UNSTABLE 2 /* -TRYAGAIN redirection required */
#define REDIS_CLUSTER_REDIR_ASK 3 /* -ASK redirection required. */
#define REDIS_CLUSTER_REDIR_MOVED 4 /* -MOVED redirection required. */
#define REDIS_CLUSTER_REDIR_DOWN 5 /* -CLUSTERDOWN error. */
#define REDIS_CLUSTER_REDIR_DOWN_STATE 5 /* -CLUSTERDOWN, global state. */
#define REDIS_CLUSTER_REDIR_DOWN_UNBOUND 6 /* -CLUSTERDOWN, unbound slot. */

struct clusterNode;

Expand Down Expand Up @@ -249,5 +250,7 @@ typedef struct {

/* ---------------------- API exported outside cluster.c -------------------- */
clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *ask);
int clusterRedirectBlockedClientIfNeeded(redisClient *c);
void clusterRedirectClient(redisClient *c, clusterNode *n, int hashslot, int error_code);

#endif /* __REDIS_CLUSTER_H */
30 changes: 9 additions & 21 deletions src/redis.c
Original file line number Diff line number Diff line change
Expand Up @@ -926,8 +926,14 @@ int clientsCronHandleTimeout(redisClient *c) {
mstime_t now_ms = mstime();

if (c->bpop.timeout != 0 && c->bpop.timeout < now_ms) {
/* Handle blocking operation specific timeout. */
replyToBlockedClientTimedOut(c);
unblockClient(c);
} else if (server.cluster_enabled) {
/* Cluster: handle unblock & redirect of clients blocked
* into keys no longer served by this server. */
if (clusterRedirectBlockedClientIfNeeded(c))
unblockClient(c);
}
}
return 0;
Expand Down Expand Up @@ -2207,32 +2213,14 @@ int processCommand(redisClient *c) {

if (server.cluster->state != REDIS_CLUSTER_OK) {
flagTransaction(c);
addReplySds(c,sdsnew("-CLUSTERDOWN The cluster is down. Use CLUSTER INFO for more information\r\n"));
clusterRedirectClient(c,NULL,0,REDIS_CLUSTER_REDIR_DOWN_STATE);
return REDIS_OK;
} else {
int error_code;
clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code);
if (n == NULL) {
if (n == NULL || n != server.cluster->myself) {
flagTransaction(c);
if (error_code == REDIS_CLUSTER_REDIR_CROSS_SLOT) {
addReplySds(c,sdsnew("-CROSSSLOT Keys in request don't hash to the same slot\r\n"));
} else if (error_code == REDIS_CLUSTER_REDIR_UNSTABLE) {
/* The request spawns mutliple keys in the same slot,
* but the slot is not "stable" currently as there is
* a migration or import in progress. */
addReplySds(c,sdsnew("-TRYAGAIN Multiple keys request during rehashing of slot\r\n"));
} else if (error_code == REDIS_CLUSTER_REDIR_DOWN) {
addReplySds(c,sdsnew("-CLUSTERDOWN The cluster is down. Hash slot is unbound\r\n"));
} else {
redisPanic("getNodeByQuery() unknown error.");
}
return REDIS_OK;
} else if (n != server.cluster->myself) {
flagTransaction(c);
addReplySds(c,sdscatprintf(sdsempty(),
"-%s %d %s:%d\r\n",
(error_code == REDIS_CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
hashslot,n->ip,n->port));
clusterRedirectClient(c,n,hashslot,error_code);
return REDIS_OK;
}
}
Expand Down

0 comments on commit 9b7f8b1

Please sign in to comment.