Skip to content

Commit

Permalink
redis support in twemproxy
Browse files Browse the repository at this point in the history
  • Loading branch information
Manju Rajashekhar committed Dec 1, 2012
1 parent 1377dcb commit 6b1982d
Show file tree
Hide file tree
Showing 15 changed files with 3,021 additions and 37 deletions.
3 changes: 2 additions & 1 deletion conf/nutcracker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ alpha:
hash: fnv1a_64
distribution: ketama
auto_eject_hosts: true
redis: true
server_retry_timeout: 2000
server_failure_limit: 1
servers:
- 127.0.0.1:11211:1
- 127.0.0.1:6379:1

beta:
listen: 127.0.0.1:22122
Expand Down
358 changes: 358 additions & 0 deletions notes/redis.md

Large diffs are not rendered by default.

23 changes: 23 additions & 0 deletions scripts/redis-check.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import redis

range=100
factor=32
port=22121

r = redis.StrictRedis(host='localhost', port=port, db=0)

# lrange
print [r.lrange('lfoo', 0, x) for x in xrange(1, range)]
print [r.lpush('lfoo', str(x)*factor) for x in xrange(1, range)]
print [r.lrange('lfoo', 0, x) for x in xrange(1, range)]
print r.delete('lfoo')

# del
print [r.set('foo' + str(x), str(x)*factor) for x in xrange(1, range)]
keys = ['foo' + str(x) for x in xrange(1, range)]
print [r.delete(keys) for x in xrange(1, range)]

# mget
print [r.set('foo' + str(x), str(x)*100) for x in xrange(1, range)]
keys = ['foo' + str(x) for x in xrange(1, range)]
print [r.mget(keys) for x in xrange(1, range)]
470 changes: 470 additions & 0 deletions scripts/redis-check.sh

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions src/nc_mbuf.c
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,16 @@ mbuf_size(struct mbuf *mbuf)
return (uint32_t)(mbuf->end - mbuf->last);
}

/*
* Return the maximum available space size for data in any mbuf. Mbuf cannot
* contain more than 2^32 bytes (4G).
*/
size_t
mbuf_data_size(void)
{
return mbuf_offset;
}

/*
* Insert mbuf at the tail of the mhdr Q
*/
Expand Down
1 change: 1 addition & 0 deletions src/nc_mbuf.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ void mbuf_put(struct mbuf *mbuf);
void mbuf_rewind(struct mbuf *mbuf);
uint32_t mbuf_length(struct mbuf *mbuf);
uint32_t mbuf_size(struct mbuf *mbuf);
size_t mbuf_data_size(void);
void mbuf_insert(struct mhdr *mhdr, struct mbuf *mbuf);
void mbuf_remove(struct mhdr *mhdr, struct mbuf *mbuf);
void mbuf_copy(struct mbuf *mbuf, uint8_t *pos, size_t n);
Expand Down
30 changes: 24 additions & 6 deletions src/nc_message.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@
* + + .
* | | .
* / \ .
* Request Response ...../ nc_mbuf.[ch] (mesage buffers)
* nc_request.c nc_response.c ...../ nc_parse.[ch] (message parser)
* Request Response .../ nc_mbuf.[ch] (mesage buffers)
* nc_request.c nc_response.c .../ nc_memcache.c; nc_redis.c (message parser)
*
* Messages in nutcracker are manipulated by a chain of processing handlers,
* where each handler is responsible for taking the input and producing an
Expand Down Expand Up @@ -237,6 +237,13 @@ _msg_get(void)
msg->nfrag = 0;
msg->frag_id = 0;

msg->narg_start = NULL;
msg->narg_end = NULL;
msg->narg = 0;
msg->rnarg = 0;
msg->rlen = 0;
msg->integer = 0;

msg->err = 0;
msg->error = 0;
msg->ferror = 0;
Expand Down Expand Up @@ -268,7 +275,15 @@ msg_get(struct conn *conn, bool request, bool redis)
msg->redis = redis ? 1 : 0;

if (redis) {
NOT_REACHED();
if (request) {
msg->parser = redis_parse_req;
} else {
msg->parser = redis_parse_rsp;
}
msg->pre_splitcopy = redis_pre_splitcopy;
msg->post_splitcopy = redis_post_splitcopy;
msg->pre_coalesce = redis_pre_coalesce;
msg->post_coalesce = redis_post_coalesce;
} else {
if (request) {
msg->parser = memcache_parse_req;
Expand All @@ -288,12 +303,13 @@ msg_get(struct conn *conn, bool request, bool redis)
}

struct msg *
msg_get_error(err_t err)
msg_get_error(bool redis, err_t err)
{
struct msg *msg;
struct mbuf *mbuf;
int n;
char *errstr = err ? strerror(err) : "unknown";
char *protstr = redis ? "-ERR" : "SERVER_ERROR";

msg = _msg_get();
if (msg == NULL) {
Expand All @@ -310,8 +326,7 @@ msg_get_error(err_t err)
}
mbuf_insert(&msg->mhdr, mbuf);

n = nc_scnprintf(mbuf->last, mbuf->end - mbuf->last, "SERVER_ERROR %s"CRLF,
errstr);
n = nc_scnprintf(mbuf->last, mbuf_size(mbuf), "%s %s"CRLF, protstr, errstr);
mbuf->last += n;
msg->mlen = (uint32_t)n;

Expand Down Expand Up @@ -743,6 +758,9 @@ msg_send_chain(struct context *ctx, struct conn *conn, struct msg *msg)
TAILQ_REMOVE(&send_msgq, msg, m_tqe);

if (nsent == 0) {
if (msg->mlen == 0) {
conn->send_done(ctx, conn, msg);
}
continue;
}

Expand Down
90 changes: 89 additions & 1 deletion src/nc_message.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,87 @@ typedef enum msg_type {
MSG_RSP_MC_ERROR, /* memcache error responses */
MSG_RSP_MC_CLIENT_ERROR,
MSG_RSP_MC_SERVER_ERROR,
MSG_REQ_REDIS_DEL, /* redis commands - keys */
MSG_REQ_REDIS_EXISTS,
MSG_REQ_REDIS_EXPIRE,
MSG_REQ_REDIS_EXPIREAT,
MSG_REQ_REDIS_PEXPIRE,
MSG_REQ_REDIS_PEXPIREAT,
MSG_REQ_REDIS_PERSIST,
MSG_REQ_REDIS_PTTL,
MSG_REQ_REDIS_TTL,
MSG_REQ_REDIS_TYPE,
MSG_REQ_REDIS_APPEND, /* redis requests - string */
MSG_REQ_REDIS_BITCOUNT,
MSG_REQ_REDIS_DECR,
MSG_REQ_REDIS_DECRBY,
MSG_REQ_REDIS_GET,
MSG_REQ_REDIS_GETBIT,
MSG_REQ_REDIS_GETRANGE,
MSG_REQ_REDIS_GETSET,
MSG_REQ_REDIS_INCR,
MSG_REQ_REDIS_INCRBY,
MSG_REQ_REDIS_INCRBYFLOAT,
MSG_REQ_REDIS_MGET,
MSG_REQ_REDIS_PSETEX,
MSG_REQ_REDIS_SET,
MSG_REQ_REDIS_SETBIT,
MSG_REQ_REDIS_SETEX,
MSG_REQ_REDIS_SETNX,
MSG_REQ_REDIS_SETRANGE,
MSG_REQ_REDIS_STRLEN,
MSG_REQ_REDIS_HDEL, /* redis requests - hashes */
MSG_REQ_REDIS_HEXISTS,
MSG_REQ_REDIS_HGET,
MSG_REQ_REDIS_HGETALL,
MSG_REQ_REDIS_HINCRBY,
MSG_REQ_REDIS_HINCRBYFLOAT,
MSG_REQ_REDIS_HKEYS,
MSG_REQ_REDIS_HLEN,
MSG_REQ_REDIS_HMGET,
MSG_REQ_REDIS_HMSET,
MSG_REQ_REDIS_HSET,
MSG_REQ_REDIS_HSETNX,
MSG_REQ_REDIS_HVALS,
MSG_REQ_REDIS_LINDEX, /* redis requests - lists */
MSG_REQ_REDIS_LINSERT,
MSG_REQ_REDIS_LLEN,
MSG_REQ_REDIS_LPOP,
MSG_REQ_REDIS_LPUSH,
MSG_REQ_REDIS_LPUSHX,
MSG_REQ_REDIS_LRANGE,
MSG_REQ_REDIS_LREM,
MSG_REQ_REDIS_LSET,
MSG_REQ_REDIS_LTRIM,
MSG_REQ_REDIS_RPOP,
MSG_REQ_REDIS_RPUSH,
MSG_REQ_REDIS_RPUSHX,
MSG_REQ_REDIS_SADD, /* redis requests - sets */
MSG_REQ_REDIS_SCARD,
MSG_REQ_REDIS_SISMEMBER,
MSG_REQ_REDIS_SMEMBERS,
MSG_REQ_REDIS_SPOP,
MSG_REQ_REDIS_SRANDMEMBER,
MSG_REQ_REDIS_SREM,
MSG_REQ_REDIS_ZADD, /* redis requests - sorted sets */
MSG_REQ_REDIS_ZCARD,
MSG_REQ_REDIS_ZCOUNT,
MSG_REQ_REDIS_ZINCRBY,
MSG_REQ_REDIS_ZRANGE,
MSG_REQ_REDIS_ZRANGEBYSCORE,
MSG_REQ_REDIS_ZRANK,
MSG_REQ_REDIS_ZREM,
MSG_REQ_REDIS_ZREMRANGEBYRANK,
MSG_REQ_REDIS_ZREMRANGEBYSCORE,
MSG_REQ_REDIS_ZREVRANGE,
MSG_REQ_REDIS_ZREVRANGEBYSCORE,
MSG_REQ_REDIS_ZREVRANK,
MSG_REQ_REDIS_ZSCORE,
MSG_RSP_REDIS_STATUS, /* redis response */
MSG_RSP_REDIS_ERROR,
MSG_RSP_REDIS_INTEGER,
MSG_RSP_REDIS_BULK,
MSG_RSP_REDIS_MULTIBULK,
MSG_SENTINEL
} msg_type_t;

Expand Down Expand Up @@ -94,6 +175,13 @@ struct msg {
uint32_t vlen; /* value length (memcache) */
uint8_t *end; /* end marker (memcache) */

uint8_t *narg_start; /* narg start (redis) */
uint8_t *narg_end; /* narg end (redis) */
uint32_t narg; /* # arguments (redis) */
uint32_t rnarg; /* running # arg used by parsing fsa (redis) */
uint32_t rlen; /* running length in parsing fsa (redis) */
uint32_t integer; /* integer reply value (redis) */

struct msg *frag_owner; /* owner of fragment message */
uint32_t nfrag; /* # fragment */
uint64_t frag_id; /* id of fragmented message */
Expand Down Expand Up @@ -122,7 +210,7 @@ void msg_init(void);
void msg_deinit(void);
struct msg *msg_get(struct conn *conn, bool request, bool redis);
void msg_put(struct msg *msg);
struct msg *msg_get_error(err_t err);
struct msg *msg_get_error(bool redis, err_t err);
void msg_dump(struct msg *msg);
bool msg_empty(struct msg *msg);
rstatus_t msg_recv(struct context *ctx, struct conn *conn);
Expand Down
7 changes: 4 additions & 3 deletions src/nc_proxy.c
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,10 @@ proxy_each_init(void *elem, void *data)
return status;
}

log_debug(LOG_NOTICE, "p %d listening on '%.*s' in pool %"PRIu32" '%.*s' "
"with %"PRIu32" servers", p->sd, pool->addrstr.len,
pool->addrstr.data, pool->idx, pool->name.len, pool->name.data,
log_debug(LOG_NOTICE, "p %d listening on '%.*s' in %s pool %"PRIu32" '%.*s'"
" with %"PRIu32" servers", p->sd, pool->addrstr.len,
pool->addrstr.data, pool->redis ? "redis" : "memcache",
pool->idx, pool->name.len, pool->name.data,
array_n(&pool->server));

return NC_OK;
Expand Down
2 changes: 1 addition & 1 deletion src/nc_response.c
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ rsp_make_error(struct context *ctx, struct conn *conn, struct msg *msg)
rsp_put(pmsg);
}

return msg_get_error(err);
return msg_get_error(conn->redis, err);
}

struct msg *
Expand Down
1 change: 1 addition & 0 deletions src/nc_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#define LF (uint8_t) 10
#define CR (uint8_t) 13
#define CRLF "\x0d\x0a"
#define CRLF_LEN (sizeof("\x0d\x0a") - 1)

#define NELEMS(a) ((sizeof(a)) / sizeof((a)[0]))

Expand Down
3 changes: 2 additions & 1 deletion src/proto/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ noinst_LIBRARIES = libproto.a
noinst_HEADERS = nc_proto.h

libproto_a_SOURCES = \
nc_memcache.c
nc_memcache.c \
nc_redis.c
3 changes: 1 addition & 2 deletions src/proto/nc_memcache.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
*/
#define MEMCACHE_MAX_KEY_LENGTH 250


/*
* Return true, if the memcache command is a storage command, otherwise
* return false
Expand Down Expand Up @@ -1299,7 +1298,7 @@ memcache_pre_coalesce(struct msg *r)
* Post-coalesce handler is invoked when the message is a response to
* the fragmented multi vector request - 'get' or 'gets' and all the
* responses to the fragmented request vector has been received and
* the the fragmented request is consider to be done
* the fragmented request is consider to be done
*/
void
memcache_post_coalesce(struct msg *r)
Expand Down
Loading

0 comments on commit 6b1982d

Please sign in to comment.