Skip to content

Commit

Permalink
proxy: allow await() to be called recursively
Browse files Browse the repository at this point in the history
previously mcp.await() only worked if it was called before any other
dispatches.

also fixes a bug if the supplied pool table was key=value instead of an
array-type table.
  • Loading branch information
dormando committed Mar 3, 2022
1 parent 31ccfc1 commit 1d825ef
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 22 deletions.
28 changes: 20 additions & 8 deletions proto_proxy.c
Original file line number Diff line number Diff line change
Expand Up @@ -228,10 +228,15 @@ void proxy_submit_cb(io_queue_t *q) {
while (p) {
// insert into tail so head is oldest request.
STAILQ_INSERT_TAIL(&head, p, io_next);
if (!p->is_await) {
if (p->is_await) {
// need to not count await objects multiple times.
if (p->await_first) {
q->count++;
}
// funny workaround: awaiting IOP's don't count toward
// resuming a connection, only the completion of the await
// condition.
} else {
q->count++;
}

Expand Down Expand Up @@ -557,13 +562,20 @@ int proxy_run_coroutine(lua_State *Lc, mc_resp *resp, io_pending_proxy_t *p, con
// TODO (v2): try harder to validate; but we have so few yield cases
// that I'm going to shortcut this here. A single yielded result
// means it's probably an await(), so attempt to process this.
// FIXME (v2): if p, do we need to free it up from the resp?
// resp should not have an IOP I think...
assert(p == NULL);
// coroutine object sitting on the _main_ VM right now, so we grab
// the reference from there, which also pops it.
int coro_ref = luaL_ref(c->thread->L, LUA_REGISTRYINDEX);
mcplib_await_run(c, Lc, coro_ref);
if (p != NULL) {
int coro_ref = p->coro_ref;
mc_resp *resp = p->resp;
assert((void *)p == (void *)resp->io_pending);
resp->io_pending = NULL;
c = p->c;
do_cache_free(c->thread->io_cache, p);
mcplib_await_run(c, resp, Lc, coro_ref);
} else {
// coroutine object sitting on the _main_ VM right now, so we grab
// the reference from there, which also pops it.
int coro_ref = luaL_ref(c->thread->L, LUA_REGISTRYINDEX);
mcplib_await_run(c, c->resp, Lc, coro_ref);
}
} else {
// need to remove and free the io_pending, since c->resp owns it.
// so we call mcp_queue_io() again and let it override the
Expand Down
2 changes: 1 addition & 1 deletion proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ enum mcp_await_e {
AWAIT_FIRST, // return the result from the first pool
};
int mcplib_await(lua_State *L);
int mcplib_await_run(conn *c, lua_State *L, int coro_ref);
int mcplib_await_run(conn *c, mc_resp *resp, lua_State *L, int coro_ref);
int mcplib_await_return(io_pending_proxy_t *p);

// user stats interface
Expand Down
22 changes: 12 additions & 10 deletions proxy_await.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,17 @@ typedef struct mcp_await_s {
int mcplib_await(lua_State *L) {
mcp_request_t *rq = luaL_checkudata(L, 1, "mcp.request");
luaL_checktype(L, 2, LUA_TTABLE);
int n = luaL_len(L, 2); // length of hash selector table
int n = 0; // length of table of pools
int wait_for = 0; // 0 means wait for all responses
enum mcp_await_e type = AWAIT_GOOD;

lua_pushnil(L); // init table key
while (lua_next(L, 2) != 0) {
luaL_checkudata(L, -1, "mcp.pool_proxy");
lua_pop(L, 1); // remove value, keep key.
n++;
}

if (n <= 0) {
proxy_lua_error(L, "mcp.await arguments must have at least one pool");
}
Expand Down Expand Up @@ -76,7 +83,7 @@ int mcplib_await(lua_State *L) {
aw->rq = rq;
aw->req_ref = req_ref;
aw->type = type;
P_DEBUG("%s: about to yield [HS len: %d]\n", __func__, n);
P_DEBUG("%s: about to yield [len: %d]\n", __func__, n);

return lua_yield(L, 1);
}
Expand Down Expand Up @@ -175,7 +182,7 @@ static void mcp_queue_await_io(conn *c, lua_State *Lc, mcp_request_t *rq, int aw
// It looks like a bulk of this code can move into mcplib_await(),
// and then here post-yield we can add the conn and coro_ref to the right
// places. Else these errors currently crash the daemon.
int mcplib_await_run(conn *c, lua_State *L, int coro_ref) {
int mcplib_await_run(conn *c, mc_resp *resp, lua_State *L, int coro_ref) {
P_DEBUG("%s: start\n", __func__);
mcp_await_t *aw = lua_touserdata(L, -1);
int await_ref = luaL_ref(L, LUA_REGISTRYINDEX); // await is popped.
Expand Down Expand Up @@ -217,14 +224,9 @@ int mcplib_await_run(conn *c, lua_State *L, int coro_ref) {
}

lua_pop(L, 1); // remove table key.
aw->resp = c->resp; // cuddle the current mc_resp to fill later

// we count the await as the "response pending" since it covers a single
// response object. the sub-IO's don't count toward the redispatch of *c
io_queue_t *q = conn_io_queue_get(c, IO_QUEUE_PROXY);
q->count++;
aw->resp = resp; // cuddle the current mc_resp to fill later

P_DEBUG("%s\n", __func__);
P_DEBUG("%s: end\n", __func__);

return 0;
}
Expand Down
10 changes: 7 additions & 3 deletions t/startfile.lua
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,16 @@ function failover_factory(zones, local_zone)
if res:hit() == false then
-- example for mcp.log... Don't do this though :)
mcp.log("failed to find " .. r:key() .. " in zone: " .. local_zone)
for _, zone in pairs(far_zones) do
res = zone(r)
--for _, zone in pairs(far_zones) do
-- res = zone(r)
local restable = mcp.await(r, far_zones, 1)
for _, res in pairs(restable) do
if res:hit() then
break
--break
return res
end
end
return restable[1]
end
return res -- send result back to client
end
Expand Down

0 comments on commit 1d825ef

Please sign in to comment.