Skip to content

Commit

Permalink
proxy: await improvements
Browse files Browse the repository at this point in the history
Adds MCP_AWAIT_* options as 4th argument. If waiting for a subset of
requests instead of all requests, the 4th argument can specify what type
of responses are considered valid.

Also now includes _all_ observed results in the result table, when a
specific number of "good" results are requested. The caller is supposed
to filter for its intent. This allows observation of error codes if all
requested results fail or it otherwise fails to meet its result
conditions (via AWAIT_*)
  • Loading branch information
dormando committed Feb 17, 2022
1 parent 90b9e5c commit 774630e
Showing 1 changed file with 101 additions and 29 deletions.
130 changes: 101 additions & 29 deletions proto_proxy.c
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,7 @@ struct _io_pending_proxy_t {
bool flushed; // whether we've fully written this request to a backend.
bool ascii_multiget; // passed on from mcp_r_t
bool is_await; // are we an await object?
bool await_first; // are we the main route for an await object?
};

// Note: does *be have to be a sub-struct? how stable are userdata pointers?
Expand Down Expand Up @@ -3402,18 +3403,6 @@ static int mcplib_attach(lua_State *L) {
return 0;
}

static void proxy_register_defines(lua_State *L) {
#define X(x) \
lua_pushinteger(L, x); \
lua_setfield(L, -2, #x);

X(P_OK);
X(CMD_ANY);
X(CMD_ANY_STORAGE);
CMD_FIELDS
#undef X
}

/*** REQUEST PARSER AND OBJECT ***/

#define PARSER_MAXLEN USHRT_MAX-1
Expand Down Expand Up @@ -4252,18 +4241,31 @@ static int mcplib_stat(lua_State *L) {

/*** START lua await() object interface ***/

enum mcp_await_e {
AWAIT_GOOD = 0, // looks for OK + NOT MISS
AWAIT_ANY, // any response, including errors,
AWAIT_OK, // any non-error response
AWAIT_FIRST, // return the result from the first pool
};

typedef struct mcp_await_s {
int pending;
int wait_for;
int req_ref;
int argtable_ref; // need to hold refs to any potential hash selectors
int restable_ref; // table of result objects
int coro_ref; // reference to parent coroutine
enum mcp_await_e type;
bool completed; // have we completed the parent coroutine or not
mcp_request_t *rq;
mc_resp *resp; // the top level mc_resp to fill in (as if we were an iop)
} mcp_await_t;

// TODO (v2): mcplib_await_gc()
// - needs to handle cases where an await is created, but a rare error happens
// before it completes and the coroutine is killed. must check and free its
// references.

// local restable = mcp.await(request, pools, num_wait)
// NOTE: need to hold onto the pool objects since those hold backend
// references. Here we just keep a reference to the argument table.
Expand All @@ -4272,6 +4274,7 @@ static int mcplib_await(lua_State *L) {
luaL_checktype(L, 2, LUA_TTABLE);
int n = luaL_len(L, 2); // length of hash selector table
int wait_for = 0; // 0 means wait for all responses
enum mcp_await_e type = AWAIT_GOOD;

if (n <= 0) {
proxy_lua_error(L, "mcp.await arguments must have at least one pool");
Expand All @@ -4284,40 +4287,57 @@ static int mcplib_await(lua_State *L) {
}
}

if (lua_isnumber(L, 4)) {
type = lua_tointeger(L, 4);
switch (type) {
case AWAIT_GOOD:
case AWAIT_ANY:
case AWAIT_OK:
case AWAIT_FIRST:
break;
default:
proxy_lua_error(L, "invalid type argument tp mcp.await");
}
}

// FIRST is only looking for one valid request.
if (type == AWAIT_FIRST) {
wait_for = 1;
}

// TODO (v2): quickly loop table once and ensure they're all pools?
// TODO (v2) in case of newuserdatauv throwing an error, we need to grab
// these references after allocating *aw else can leak memory.
int argtable_ref = luaL_ref(L, LUA_REGISTRYINDEX); // pops the arg table
int req_ref = luaL_ref(L, LUA_REGISTRYINDEX); // pops request object.

// stack will be only the await object now
// allocate before grabbing references so an error won't cause leaks.
mcp_await_t *aw = lua_newuserdatauv(L, sizeof(mcp_await_t), 0);
memset(aw, 0, sizeof(mcp_await_t));

aw->wait_for = wait_for;
aw->pending = n;
aw->argtable_ref = argtable_ref;
aw->rq = rq;
aw->req_ref = req_ref;
aw->type = type;
P_DEBUG("%s: about to yield [HS len: %d]\n", __func__, n);
//dump_stack(L);

return lua_yield(L, 1);
}

static void mcp_queue_await_io(conn *c, lua_State *Lc, mcp_request_t *rq, int await_ref) {
static void mcp_queue_await_io(conn *c, lua_State *Lc, mcp_request_t *rq, int await_ref, bool await_first) {
io_queue_t *q = conn_io_queue_get(c, IO_QUEUE_PROXY);

mcp_backend_t *be = rq->be;

// Then we push a response object, which we'll re-use later.
// reserve one uservalue for a lua-supplied response.
mcp_resp_t *r = lua_newuserdatauv(Lc, sizeof(mcp_resp_t), 1);
if (r == NULL) {
proxy_lua_error(Lc, "out of memory allocating response");
return;
}
memset(r, 0, sizeof(mcp_resp_t));
r->buf = NULL;
r->blen = 0;
r->start = rq->start;

int x;
int end = rq->pr.reqlen-2 > RESP_CMD_MAX ? RESP_CMD_MAX : rq->pr.reqlen-2;
for (x = 0; x < end; x++) {
Expand Down Expand Up @@ -4361,6 +4381,7 @@ static void mcp_queue_await_io(conn *c, lua_State *Lc, mcp_request_t *rq, int aw
// await specific
p->is_await = true;
p->await_ref = await_ref;
p->await_first = await_first;

// The direct backend object. await object is holding reference
p->backend = be;
Expand All @@ -4375,6 +4396,10 @@ static void mcp_queue_await_io(conn *c, lua_State *Lc, mcp_request_t *rq, int aw
return;
}

// TODO (v2): need to get this code running under pcall().
// It looks like a bulk of this code can move into mcplib_await(),
// and then here post-yield we can add the conn and coro_ref to the right
// places. Else these errors currently crash the daemon.
static int mcplib_await_run(conn *c, lua_State *L, int coro_ref) {
P_DEBUG("%s: start\n", __func__);
mcp_await_t *aw = lua_touserdata(L, -1);
Expand All @@ -4393,6 +4418,7 @@ static int mcplib_await_run(conn *c, lua_State *L, int coro_ref) {
// prepare the request key
const char *key = MCP_PARSER_KEY(rq->pr);
size_t len = rq->pr.klen;
bool await_first = true;
// loop arg table and run each hash selector
lua_pushnil(L); // -> 3
while (lua_next(L, 1) != 0) {
Expand All @@ -4408,7 +4434,8 @@ static int mcplib_await_run(conn *c, lua_State *L, int coro_ref) {
// mcp_queue call. Could be a local variable and an argument too.
rq->be = _mcplib_pool_proxy_call_helper(L, p, key, len);

mcp_queue_await_io(c, L, rq, await_ref);
mcp_queue_await_io(c, L, rq, await_ref, await_first);
await_first = false;

// pop value, keep key.
lua_pop(L, 1);
Expand All @@ -4423,17 +4450,20 @@ static int mcplib_await_run(conn *c, lua_State *L, int coro_ref) {
q->count++;

P_DEBUG("%s\n", __func__);
//dump_stack(L); // should be empty

return 0;
}

//lua_rawseti(L, -2, x++);
// NOTE: This is unprotected lua/C code. There are no lua-style errors thrown
// purposefully as of this writing, but it's still not safe. Either the code
// can be restructured to use less lua (which I think is better long term
// anyway) or it can be pushed behind a cfunc pcall so we don't crash the
// daemon if something bad happens.
static int mcplib_await_return(io_pending_proxy_t *p) {
mcp_await_t *aw;
lua_State *L = p->thread->L; // use the main VM coroutine for work
bool cleanup = false;
bool valid = false;
bool valid = false; // is response valid to add to the result table.
bool completing = false;

// TODO (v2): just push the await ptr into *p?
Expand All @@ -4450,18 +4480,44 @@ static int mcplib_await_return(io_pending_proxy_t *p) {
// if success and wait_for is *now* 0, we complete.
// add successful response to response table
// Also, if no wait_for, add response to response table
// TODO (v2): for GOOD or OK cases, it might be better to return the
// last object as valid if there are otherwise zero valids?
// Think we just have to count valids...
if (!aw->completed) {
valid = true; // always collect results unless we are completed.
if (aw->wait_for > 0) {
if (p->client_resp->status == MCMC_OK && p->client_resp->resp.code != MCMC_CODE_MISS) {
valid = true;
bool is_good = false;
switch (aw->type) {
case AWAIT_GOOD:
if (p->client_resp->status == MCMC_OK && p->client_resp->resp.code != MCMC_CODE_MISS) {
is_good = true;
}
break;
case AWAIT_ANY:
is_good = true;
break;
case AWAIT_OK:
if (p->client_resp->status == MCMC_OK) {
is_good = true;
}
break;
case AWAIT_FIRST:
if (p->await_first) {
is_good = true;
} else {
// user only wants the first pool's result.
valid = false;
}
break;
}

if (is_good) {
aw->wait_for--;
}
aw->wait_for--;

if (aw->wait_for == 0) {
completing = true;
}
} else {
valid = true;
}
}

Expand Down Expand Up @@ -4546,6 +4602,22 @@ static int mcplib_open_hash_xxhash(lua_State *L) {

/*** END xxhash module ***/

static void proxy_register_defines(lua_State *L) {
#define X(x) \
lua_pushinteger(L, x); \
lua_setfield(L, -2, #x);

X(P_OK);
X(CMD_ANY);
X(CMD_ANY_STORAGE);
X(AWAIT_GOOD);
X(AWAIT_ANY);
X(AWAIT_OK);
X(AWAIT_FIRST);
CMD_FIELDS
#undef X
}

// Creates and returns the top level "mcp" module
int proxy_register_libs(LIBEVENT_THREAD *t, void *ctx) {
lua_State *L = ctx;
Expand Down

0 comments on commit 774630e

Please sign in to comment.