Skip to content

Commit

Permalink
Merge pull request twitter#294 from arnecls/RedisSelectDB
Browse files Browse the repository at this point in the history
Redis SELECT on connect
  • Loading branch information
manjuraj committed Dec 22, 2014
2 parents 4d9a8a0 + b103e75 commit 37ca913
Show file tree
Hide file tree
Showing 11 changed files with 141 additions and 1 deletion.
11 changes: 10 additions & 1 deletion src/nc_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ static struct command conf_commands[] = {
{ string("redis_auth"),
conf_set_string,
offsetof(struct conf_pool, redis_auth) },
{ string("redis_db"),
conf_set_num,
offsetof(struct conf_pool, redis_db) },

{ string("preconnect"),
conf_set_bool,
Expand Down Expand Up @@ -189,6 +192,7 @@ conf_pool_init(struct conf_pool *cp, struct string *name)
cp->client_connections = CONF_UNSET_NUM;

cp->redis = CONF_UNSET_NUM;
cp->redis_db = CONF_UNSET_NUM;
cp->preconnect = CONF_UNSET_NUM;
cp->auto_eject_hosts = CONF_UNSET_NUM;
cp->server_connections = CONF_UNSET_NUM;
Expand Down Expand Up @@ -278,6 +282,7 @@ conf_pool_each_transform(void *elem, void *data)

sp->redis = cp->redis ? 1 : 0;
sp->redis_auth = cp->redis_auth;
sp->redis_db = cp->redis_db;
sp->timeout = cp->timeout;
sp->backlog = cp->backlog;

Expand Down Expand Up @@ -1171,7 +1176,7 @@ conf_validate_server(struct conf *cf, struct conf_pool *cp)

if (string_compare(&cs1->name, &cs2->name) == 0) {
log_error("conf: pool '%.*s' has servers with same name '%.*s'",
cp->name.len, cp->name.data, cs1->name.len,
cp->name.len, cp->name.data, cs1->name.len,
cs1->name.data);
valid = false;
break;
Expand Down Expand Up @@ -1221,6 +1226,10 @@ conf_validate_pool(struct conf *cf, struct conf_pool *cp)
cp->redis = CONF_DEFAULT_REDIS;
}

if (cp->redis_db == CONF_UNSET_NUM) {
cp->redis_db = CONF_DEFAULT_REDIS_DB;
}

if (cp->preconnect == CONF_UNSET_NUM) {
cp->preconnect = CONF_DEFAULT_PRECONNECT;
}
Expand Down
2 changes: 2 additions & 0 deletions src/nc_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
#define CONF_DEFAULT_LISTEN_BACKLOG 512
#define CONF_DEFAULT_CLIENT_CONNECTIONS 0
#define CONF_DEFAULT_REDIS false
#define CONF_DEFAULT_REDIS_DB 0
#define CONF_DEFAULT_PRECONNECT false
#define CONF_DEFAULT_AUTO_EJECT_HOSTS false
#define CONF_DEFAULT_SERVER_RETRY_TIMEOUT 30 * 1000 /* in msec */
Expand Down Expand Up @@ -82,6 +83,7 @@ struct conf_pool {
int client_connections; /* client_connections: */
int redis; /* redis: */
struct string redis_auth; /* redis auth password */
int redis_db; /* redis_db: */
int preconnect; /* preconnect: */
int auto_eject_hosts; /* auto_eject_hosts: */
int server_connections; /* server_connections: */
Expand Down
4 changes: 4 additions & 0 deletions src/nc_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,8 @@ conn_get(void *owner, bool client, bool redis)
conn->dequeue_inq = NULL;
conn->enqueue_outq = req_client_enqueue_omsgq;
conn->dequeue_outq = req_client_dequeue_omsgq;
conn->init = NULL;
conn->swallow_msg = NULL;

ncurr_cconn++;
} else {
Expand Down Expand Up @@ -243,6 +245,8 @@ conn_get(void *owner, bool client, bool redis)
conn->dequeue_inq = req_server_dequeue_imsgq;
conn->enqueue_outq = req_server_enqueue_omsgq;
conn->dequeue_outq = req_server_dequeue_omsgq;
conn->init = redis_conn_init;
conn->swallow_msg = redis_swallow_msg;
}

conn->ref(conn, owner);
Expand Down
4 changes: 4 additions & 0 deletions src/nc_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ typedef void (*conn_ref_t)(struct conn *, void *);
typedef void (*conn_unref_t)(struct conn *);

typedef void (*conn_msgq_t)(struct context *, struct conn *, struct msg *);
typedef void (*conn_initialize_t)(struct context *ctx, struct conn *, struct server *server);
typedef void (*conn_swallow_msg_t)(struct conn *, struct msg *, struct msg *);

struct conn {
TAILQ_ENTRY(conn) conn_tqe; /* link in server_pool / server / free q */
Expand All @@ -58,6 +60,8 @@ struct conn {
conn_send_done_t send_done; /* write done handler */
conn_close_t close; /* close handler */
conn_active_t active; /* active? handler */
conn_initialize_t init; /* connection initialize handler */
conn_swallow_msg_t swallow_msg; /* react on messages to be swallowed */

conn_ref_t ref; /* connection reference handler */
conn_unref_t unref; /* connection unreference handler */
Expand Down
2 changes: 2 additions & 0 deletions src/nc_message.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ typedef enum msg_parse_result {
ACTION( REQ_REDIS_PING ) /* redis requests - ping/quit */ \
ACTION( REQ_REDIS_QUIT) \
ACTION( REQ_REDIS_AUTH) \
ACTION( REQ_REDIS_SELECT) /* only during init */ \
ACTION( RSP_REDIS_STATUS ) /* redis response */ \
ACTION( RSP_REDIS_ERROR ) \
ACTION( RSP_REDIS_INTEGER ) \
Expand Down Expand Up @@ -271,6 +272,7 @@ void req_put(struct msg *msg);
bool req_done(struct conn *conn, struct msg *msg);
bool req_error(struct conn *conn, struct msg *msg);
void req_server_enqueue_imsgq(struct context *ctx, struct conn *conn, struct msg *msg);
void req_server_enqueue_imsgq_head(struct context *ctx, struct conn *conn, struct msg *msg);
void req_server_dequeue_imsgq(struct context *ctx, struct conn *conn, struct msg *msg);
void req_client_enqueue_omsgq(struct context *ctx, struct conn *conn, struct msg *msg);
void req_server_enqueue_omsgq(struct context *ctx, struct conn *conn, struct msg *msg);
Expand Down
24 changes: 24 additions & 0 deletions src/nc_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,30 @@ req_server_enqueue_imsgq(struct context *ctx, struct conn *conn, struct msg *msg
stats_server_incr_by(ctx, conn->owner, in_queue_bytes, msg->mlen);
}

void
req_server_enqueue_imsgq_head(struct context *ctx, struct conn *conn, struct msg *msg)
{
ASSERT(msg->request);
ASSERT(!conn->client && !conn->proxy);

/*
* timeout clock starts ticking the instant the message is enqueued into
* the server in_q; the clock continues to tick until it either expires
* or the message is dequeued from the server out_q
*
* noreply request are free from timeouts because client is not intrested
* in the reponse anyway!
*/
if (!msg->noreply) {
msg_tmo_insert(msg, conn);
}

TAILQ_INSERT_HEAD(&conn->imsg_q, msg, s_tqe);

stats_server_incr(ctx, conn->owner, in_queue);
stats_server_incr_by(ctx, conn->owner, in_queue_bytes, msg->mlen);
}

void
req_server_dequeue_imsgq(struct context *ctx, struct conn *conn, struct msg *msg)
{
Expand Down
4 changes: 4 additions & 0 deletions src/nc_response.c
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,10 @@ rsp_filter(struct context *ctx, struct conn *conn, struct msg *msg)
ASSERT(pmsg->request && !pmsg->done);

if (pmsg->swallow) {
if (conn->swallow_msg) {
conn->swallow_msg(conn, pmsg, msg);
}

conn->dequeue_outq(ctx, conn, pmsg);
pmsg->done = 1;

Expand Down
6 changes: 6 additions & 0 deletions src/nc_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,8 @@ server_close(struct context *ctx, struct conn *conn)
server_close_stats(ctx, conn->owner, conn->err, conn->eof,
conn->connected);

conn->connected = false;

if (conn->sd < 0) {
server_failure(ctx, conn->owner);
conn->unref(conn);
Expand Down Expand Up @@ -535,6 +537,10 @@ server_connected(struct context *ctx, struct conn *conn)
conn->connecting = 0;
conn->connected = 1;

if (conn->init) {
conn->init(ctx, conn, server);
}

log_debug(LOG_INFO, "connected on s %d to server '%.*s'", conn->sd,
server->pname.len, server->pname.data);
}
Expand Down
1 change: 1 addition & 0 deletions src/nc_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ struct server_pool {
struct string hash_tag; /* key hash tag (ref in conf_pool) */
int timeout; /* timeout in msec */
int backlog; /* listen backlog */
int redis_db; /* redis database to connect to */
uint32_t client_connections; /* maximum # client connection */
uint32_t server_connections; /* maximum # server connection */
int64_t server_retry_timeout; /* server retry timeout in usec */
Expand Down
2 changes: 2 additions & 0 deletions src/proto/nc_proto.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,5 +153,7 @@ void redis_post_coalesce(struct msg *r);
rstatus_t redis_add_auth_packet(struct context *ctx, struct conn *c_conn, struct conn *s_conn);
rstatus_t redis_fragment(struct msg *r, uint32_t ncontinuum, struct msg_tqh *frag_msgq);
rstatus_t redis_reply(struct msg *r);
void redis_conn_init(struct context *ctx, struct conn *conn, struct server *server);
void redis_swallow_msg(struct conn *conn, struct msg *pmsg, struct msg *msg);

#endif
82 changes: 82 additions & 0 deletions src/proto/nc_redis.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include <stdio.h>
#include <ctype.h>
#include <math.h>

#include <nc_core.h>
#include <nc_proto.h>
Expand Down Expand Up @@ -2659,3 +2660,84 @@ redis_add_auth_packet(struct context *ctx, struct conn *c_conn, struct conn *s_c

return NC_OK;
}

void
redis_conn_init(struct context *ctx, struct conn *conn, struct server *server)
{
ASSERT(!conn->client && conn->connected);

if (conn->redis && server->owner->redis_db > 0)
{
uint8_t command[64];
int digits;
size_t commandlen;
struct msg *msg;
struct mbuf *mbuf;

digits = (server->owner->redis_db >= 10)
? (int)log10(server->owner->redis_db) + 1
: 1;

commandlen = nc_snprintf(command, sizeof(command), "*2\r\n$6\r\nSELECT\r\n$%d\r\n%d\r\n", digits, server->owner->redis_db);

/*
* Create a fake client message and add it to the pipeline
* We force this message to be head of queue as it might already contain
* a command that triggered the connect.
*/

msg = msg_get(conn, true, conn->redis);
mbuf = mbuf_get();

mbuf_copy(mbuf, command, commandlen);
mbuf_insert(&msg->mhdr, mbuf);

msg->pos = mbuf->pos;
msg->mlen += (uint32_t)commandlen;
msg->type = MSG_REQ_REDIS_SELECT;
msg->result = MSG_PARSE_OK;
msg->swallow = 1;
msg->owner = NULL;

/* Enqueue as head and send */

req_server_enqueue_imsgq_head(ctx, conn, msg);
msg_send(ctx, conn);

log_debug(LOG_NOTICE, "sent \"SELECT %d\" to %s | %s. ",
server->owner->redis_db, server->owner->name.data, server->name.data);
}
}

void
redis_swallow_msg(struct conn *conn, struct msg *pmsg, struct msg *msg)
{
if (pmsg != NULL && pmsg->type == MSG_REQ_REDIS_SELECT &&
msg != NULL && msg->type == MSG_RSP_REDIS_ERROR)
{
struct server* conn_server;
struct server_pool* conn_pool;
struct mbuf* rsp_buffer;
uint8_t message[128];
size_t copy_len;

/*
* Get a substring from the message so that the inital - and the trailing
* \r\n is removed.
*/

conn_server = (struct server*)conn->owner;
conn_pool = conn_server->owner;
rsp_buffer = STAILQ_LAST(&msg->mhdr, mbuf, next);
copy_len = MIN(mbuf_length(rsp_buffer)-3, sizeof(message)-1);

nc_memcpy(message, &rsp_buffer->start[1], copy_len);
message[copy_len] = 0;

log_warn("SELECT %d failed on %s | %s: %s",
conn_pool->redis_db,
conn_pool->name.data,
conn_server->name.data,
message);
}
}

0 comments on commit 37ca913

Please sign in to comment.