Skip to content

Commit

Permalink
Adds count to L/RPOP (redis#8179)
Browse files Browse the repository at this point in the history
Adds: `L/RPOP <key> [count]`

Implements no. 2 of the following strategies:

1. Loop on listTypePop - this would result in multiple calls for memory freeing and allocating (see redis@769167a)
2. Iterate the range to build the reply, then call quickListDelRange - this requires two iterations and **is the current choice**
3. Refactor quicklist to have a pop variant of quickListDelRange - probably optimal but more complex

Also:
* There's a historical check for NULL after calling listTypePop that was converted to an assert.
* This refactors common logic shared between LRANGE and the new form of LPOP/RPOP into addListRangeReply (adds test for b/w compat)
* Consequently, it may have made sense to have `LRANGE l -1 -2` and `LRANGE l 9 0` be legit and return a reverse reply. Due to historical reasons that would be, however, a breaking change.
* Added minimal comments to existing commands to adhere to the style, make core dev life easier and get commit karma, naturally.
  • Loading branch information
itamarhaber authored Dec 25, 2020
1 parent e18068d commit f44186e
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 47 deletions.
4 changes: 2 additions & 2 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -288,11 +288,11 @@ struct redisCommand redisCommandTable[] = {
"write use-memory @list",
0,NULL,1,1,1,0,0,0},

{"rpop",rpopCommand,2,
{"rpop",rpopCommand,-2,
"write fast @list",
0,NULL,1,1,1,0,0,0},

{"lpop",lpopCommand,2,
{"lpop",lpopCommand,-2,
"write fast @list",
0,NULL,1,1,1,0,0,0},

Expand Down
2 changes: 1 addition & 1 deletion src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1849,7 +1849,7 @@ void listTypeConvert(robj *subject, int enc);
robj *listTypeDup(robj *o);
void unblockClientWaitingData(client *c);
void popGenericCommand(client *c, int where);
void listElementsRemoved(client *c, robj *key, int where, robj *o);
void listElementsRemoved(client *c, robj *key, int where, robj *o, long count);

/* MULTI/EXEC/WATCH... */
void unwatchAllKeys(client *c);
Expand Down
156 changes: 112 additions & 44 deletions src/t_list.c
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ robj *listTypeDup(robj *o) {
lobj->encoding = OBJ_ENCODING_QUICKLIST;
break;
default:
serverPanic("Wrong encoding.");
serverPanic("Unknown list encoding");
break;
}
return lobj;
Expand All @@ -216,6 +216,7 @@ robj *listTypeDup(robj *o) {
* List Commands
*----------------------------------------------------------------------------*/

/* Implements LPUSH/RPUSH. */
void pushGenericCommand(client *c, int where) {
int j, pushed = 0;
robj *lobj = lookupKeyWrite(c->db,c->argv[1]);
Expand Down Expand Up @@ -244,14 +245,17 @@ void pushGenericCommand(client *c, int where) {
server.dirty += pushed;
}

/* LPUSH <key> <element> [<element> ...] */
void lpushCommand(client *c) {
pushGenericCommand(c,LIST_HEAD);
}

/* RPUSH <key> <element> [<element> ...] */
void rpushCommand(client *c) {
pushGenericCommand(c,LIST_TAIL);
}

/* Implements LPUSHX/RPUSHX. */
void pushxGenericCommand(client *c, int where) {
int j, pushed = 0;
robj *subject;
Expand All @@ -274,14 +278,17 @@ void pushxGenericCommand(client *c, int where) {
server.dirty += pushed;
}

/* LPUSHX <key> <element> [<element> ...] */
void lpushxCommand(client *c) {
pushxGenericCommand(c,LIST_HEAD);
}

/* RPUSH <key> <element> [<element> ...] */
void rpushxCommand(client *c) {
pushxGenericCommand(c,LIST_TAIL);
}

/* LINSERT <key> (BEFORE|AFTER) <pivot> <element> */
void linsertCommand(client *c) {
int where;
robj *subject;
Expand Down Expand Up @@ -326,12 +333,14 @@ void linsertCommand(client *c) {
addReplyLongLong(c,listTypeLength(subject));
}

/* LLEN <key> */
void llenCommand(client *c) {
robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.czero);
if (o == NULL || checkType(c,o,OBJ_LIST)) return;
addReplyLongLong(c,listTypeLength(o));
}

/* LINDEX <key> <index> */
void lindexCommand(client *c) {
robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp]);
if (o == NULL || checkType(c,o,OBJ_LIST)) return;
Expand Down Expand Up @@ -359,6 +368,7 @@ void lindexCommand(client *c) {
}
}

/* LSET <key> <index> <element> */
void lsetCommand(client *c) {
robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr);
if (o == NULL || checkType(c,o,OBJ_LIST)) return;
Expand All @@ -385,7 +395,53 @@ void lsetCommand(client *c) {
}
}

void listElementsRemoved(client *c, robj *key, int where, robj *o) {
/* A helper for replying with a list's range between the inclusive start and end
* indexes as multi-bulk, with support for negative indexes. Note that start
* must be less than end or an empty array is returned. When the reverse
* argument is set to a non-zero value, the reply is reversed so that elements
* are returned from end to start. */
void addListRangeReply(client *c, robj *o, long start, long end, int reverse) {
long rangelen, llen = listTypeLength(o);

/* Convert negative indexes. */
if (start < 0) start = llen+start;
if (end < 0) end = llen+end;
if (start < 0) start = 0;

/* Invariant: start >= 0, so this test will be true when end < 0.
* The range is empty when start > end or start >= length. */
if (start > end || start >= llen) {
addReply(c,shared.emptyarray);
return;
}
if (end >= llen) end = llen-1;
rangelen = (end-start)+1;

/* Return the result in form of a multi-bulk reply */
addReplyArrayLen(c,rangelen);
if (o->encoding == OBJ_ENCODING_QUICKLIST) {
int from = reverse ? end : start;
int direction = reverse ? LIST_HEAD : LIST_TAIL;
listTypeIterator *iter = listTypeInitIterator(o,from,direction);

while(rangelen--) {
listTypeEntry entry;
listTypeNext(iter, &entry);
quicklistEntry *qe = &entry.entry;
if (qe->value) {
addReplyBulkCBuffer(c,qe->value,qe->sz);
} else {
addReplyBulkLongLong(c,qe->longval);
}
}
listTypeReleaseIterator(iter);
} else {
serverPanic("Unknown list encoding");
}
}

/* A housekeeping helper for list elements popping tasks. */
void listElementsRemoved(client *c, robj *key, int where, robj *o, long count) {
char *event = (where == LIST_HEAD) ? "lpop" : "rpop";

notifyKeyspaceEvent(NOTIFY_LIST, event, key, c->db->id);
Expand All @@ -394,78 +450,84 @@ void listElementsRemoved(client *c, robj *key, int where, robj *o) {
dbDelete(c->db, key);
}
signalModifiedKey(c, c->db, key);
server.dirty++;
server.dirty += count;
}

/* Implements the generic list pop operation for LPOP/RPOP.
* The where argument specifies which end of the list is operated on. An
* optional count may be provided as the third argument of the client's
* command. */
void popGenericCommand(client *c, int where) {
long count = 0;
robj *value;

if (c->argc > 3) {
addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
c->cmd->name);
return;
} else if (c->argc == 3) {
/* Parse the optional count argument. */
if (getPositiveLongFromObjectOrReply(c,c->argv[2],&count,NULL) != C_OK)
return;
if (count == 0) {
/* Fast exit path. */
addReplyNullArray(c);
return;
}
}

robj *o = lookupKeyWriteOrReply(c, c->argv[1], shared.null[c->resp]);
if (o == NULL || checkType(c, o, OBJ_LIST))
return;

robj *value = listTypePop(o, where);
if (value == NULL) {
addReplyNull(c);
} else {
if (!count) {
/* Pop a single element. This is POP's original behavior that replies
* with a bulk string. */
value = listTypePop(o,where);
serverAssert(value != NULL);
addReplyBulk(c,value);
decrRefCount(value);
listElementsRemoved(c,c->argv[1],where,o);
listElementsRemoved(c,c->argv[1],where,o,1);
} else {
/* Pop a range of elements. An addition to the original POP command,
* which replies with a multi-bulk. */
long llen = listTypeLength(o);
long rangelen = (count > llen) ? llen : count;
long rangestart = (where == LIST_HEAD) ? 0 : -rangelen;
long rangeend = (where == LIST_HEAD) ? rangelen - 1 : -1;
int reverse = (where == LIST_HEAD) ? 0 : 1;

addListRangeReply(c,o,rangestart,rangeend,reverse);
quicklistDelRange(o->ptr,rangestart,rangelen);
listElementsRemoved(c,c->argv[1],where,o,rangelen);
}
}

/* LPOP <key> [count] */
void lpopCommand(client *c) {
popGenericCommand(c,LIST_HEAD);
}

/* RPOP <key> [count] */
void rpopCommand(client *c) {
popGenericCommand(c,LIST_TAIL);
}

/* LRANGE <key> <start> <stop> */
void lrangeCommand(client *c) {
robj *o;
long start, end, llen, rangelen;
long start, end;

if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) ||
(getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK)) return;

if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptyarray)) == NULL
|| checkType(c,o,OBJ_LIST)) return;
llen = listTypeLength(o);

/* convert negative indexes */
if (start < 0) start = llen+start;
if (end < 0) end = llen+end;
if (start < 0) start = 0;

/* Invariant: start >= 0, so this test will be true when end < 0.
* The range is empty when start > end or start >= length. */
if (start > end || start >= llen) {
addReply(c,shared.emptyarray);
return;
}
if (end >= llen) end = llen-1;
rangelen = (end-start)+1;

/* Return the result in form of a multi-bulk reply */
addReplyArrayLen(c,rangelen);
if (o->encoding == OBJ_ENCODING_QUICKLIST) {
listTypeIterator *iter = listTypeInitIterator(o, start, LIST_TAIL);

while(rangelen--) {
listTypeEntry entry;
listTypeNext(iter, &entry);
quicklistEntry *qe = &entry.entry;
if (qe->value) {
addReplyBulkCBuffer(c,qe->value,qe->sz);
} else {
addReplyBulkLongLong(c,qe->longval);
}
}
listTypeReleaseIterator(iter);
} else {
serverPanic("List encoding is not QUICKLIST!");
}
addListRangeReply(c,o,start,end,0);
}

/* LTRIM <key> <start> <stop> */
void ltrimCommand(client *c) {
robj *o;
long start, end, llen, ltrim, rtrim;
Expand Down Expand Up @@ -629,6 +691,7 @@ void lposCommand(client *c) {
}
}

/* LREM <key> <count> <element> */
void lremCommand(client *c) {
robj *subject, *obj;
obj = c->argv[3];
Expand Down Expand Up @@ -756,6 +819,7 @@ void lmoveGenericCommand(client *c, int wherefrom, int whereto) {
}
}

/* LMOVE <source> <destination> (LEFT|RIGHT) (LEFT|RIGHT) */
void lmoveCommand(client *c) {
int wherefrom, whereto;
if (getListPositionFromObjectOrReply(c,c->argv[3],&wherefrom)
Expand Down Expand Up @@ -888,7 +952,7 @@ void blockingPopGenericCommand(client *c, int where) {
addReplyBulk(c,c->argv[j]);
addReplyBulk(c,value);
decrRefCount(value);
listElementsRemoved(c,c->argv[j],where,o);
listElementsRemoved(c,c->argv[j],where,o,1);

/* Replicate it as an [LR]POP instead of B[LR]POP. */
rewriteClientCommandVector(c,2,
Expand All @@ -912,10 +976,12 @@ void blockingPopGenericCommand(client *c, int where) {
blockForKeys(c,BLOCKED_LIST,c->argv + 1,c->argc - 2,timeout,NULL,&pos,NULL);
}

/* BLPOP <key> [<key> ...] <timeout> */
void blpopCommand(client *c) {
blockingPopGenericCommand(c,LIST_HEAD);
}

/* BLPOP <key> [<key> ...] <timeout> */
void brpopCommand(client *c) {
blockingPopGenericCommand(c,LIST_TAIL);
}
Expand All @@ -942,6 +1008,7 @@ void blmoveGenericCommand(client *c, int wherefrom, int whereto, mstime_t timeou
}
}

/* BLMOVE <source> <destination> (LEFT|RIGHT) (LEFT|RIGHT) <timeout> */
void blmoveCommand(client *c) {
mstime_t timeout;
int wherefrom, whereto;
Expand All @@ -954,6 +1021,7 @@ void blmoveCommand(client *c) {
blmoveGenericCommand(c,wherefrom,whereto,timeout);
}

/* BRPOPLPUSH <source> <destination> <timeout> */
void brpoplpushCommand(client *c) {
mstime_t timeout;
if (getTimeoutFromObjectOrReply(c,c->argv[3],&timeout,UNIT_SECONDS)
Expand Down
17 changes: 17 additions & 0 deletions tests/unit/type/list.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,17 @@ start_server {
test {R/LPOP against empty list} {
r lpop non-existing-list
} {}

test {R/LPOP with the optional count argument} {
assert_equal 7 [r lpush listcount aa bb cc dd ee ff gg]
assert_equal {} [r lpop listcount 0]
assert_equal {gg} [r lpop listcount 1]
assert_equal {ff ee} [r lpop listcount 2]
assert_equal {aa bb} [r rpop listcount 2]
assert_equal {cc} [r rpop listcount 1]
assert_equal {dd} [r rpop listcount 123]
assert_error "*ERR*range*" {r lpop forbarqaz -123}
}

test {Variadic RPUSH/LPUSH} {
r del mylist
Expand Down Expand Up @@ -947,6 +958,12 @@ start_server {
assert_equal {} [r lrange nosuchkey 0 1]
}

test {LRANGE with start > end yields an empty array for backward compatibility} {
create_list mylist "1 2 3"
assert_equal {} [r lrange mylist 1 0]
assert_equal {} [r lrange mylist -1 -2]
}

foreach {type large} [array get largevalue] {
proc trim_list {type min max} {
upvar 1 large large
Expand Down

0 comments on commit f44186e

Please sign in to comment.