Skip to content

Commit

Permalink
proxy: clean logic around lua yielding
Browse files Browse the repository at this point in the history
We were duck typing the response code for a coroutine yield before. It
would also pile random logic for overriding IO's in certain cases. This
now makes everything explicit and more clear.
  • Loading branch information
dormando committed Jan 12, 2023
1 parent 0e87c6d commit 6537cfe
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 47 deletions.
82 changes: 37 additions & 45 deletions proto_proxy.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
#define PROCESS_NORMAL false
static void proxy_process_command(conn *c, char *command, size_t cmdlen, bool multiget);
static void mcp_queue_io(conn *c, mc_resp *resp, int coro_ref, lua_State *Lc);
static void proxy_out_errstring(mc_resp *resp, const char *str);

/******** EXTERNAL FUNCTIONS ******/
// functions starting with _ are breakouts for the public functions.
Expand Down Expand Up @@ -469,7 +468,7 @@ void proxy_lua_ferror(lua_State *L, const char *fmt, ...) {
}

// Need a custom function so we can prefix lua strings easily.
static void proxy_out_errstring(mc_resp *resp, const char *str) {
void proxy_out_errstring(mc_resp *resp, const char *str) {
size_t len;
const static char error_prefix[] = "SERVER_ERROR ";
const static int error_prefix_len = sizeof(error_prefix) - 1;
Expand Down Expand Up @@ -580,52 +579,45 @@ int proxy_run_coroutine(lua_State *Lc, mc_resp *resp, io_pending_proxy_t *p, con
} else {
proxy_out_errstring(resp, "bad response");
}

} else if (cores == LUA_YIELD) {
if (nresults == 1) {
// TODO (v2): try harder to validate; but we have so few yield cases
// that I'm going to shortcut this here. A single yielded result
// means it's probably an await(), so attempt to process this.
if (p != NULL) {
int coro_ref = p->coro_ref;
mc_resp *resp = p->resp;
assert((void *)p == (void *)resp->io_pending);
resp->io_pending = NULL;
c = p->c;
do_cache_free(c->thread->io_cache, p);
mcplib_await_run(c, resp, Lc, coro_ref);
} else {
// coroutine object sitting on the _main_ VM right now, so we grab
// the reference from there, which also pops it.
int coro_ref = luaL_ref(c->thread->L, LUA_REGISTRYINDEX);
mcplib_await_run(c, c->resp, Lc, coro_ref);
}
int coro_ref = 0;
int yield_type = lua_tointeger(Lc, -1);
assert(yield_type != 0);
lua_pop(Lc, 1);

// need to remove and free the io_pending, since c->resp owns it.
// so we call mcp_queue_io() again and let it override the
// mc_resp's io_pending object.
//
// p is not null only when being called from proxy_return_cb(),
// a pending IO is returning to resume.
if (p != NULL) {
coro_ref = p->coro_ref;
assert((void *)p == (void *)resp->io_pending);
resp->io_pending = NULL;
c = p->c;
// *p is now dead.
do_cache_free(c->thread->io_cache, p);
} else {
// need to remove and free the io_pending, since c->resp owns it.
// so we call mcp_queue_io() again and let it override the
// mc_resp's io_pending object.

int coro_ref = 0;
mc_resp *resp;
if (p != NULL) {
coro_ref = p->coro_ref;
resp = p->resp;
c = p->c;
do_cache_free(p->c->thread->io_cache, p);
// *p is now dead.
} else {
// yielding from a top level call to the coroutine,
// so we need to grab a reference to the coroutine thread.
// TODO (v2): make this more explicit?
// we only need to get the reference here, and error conditions
// should instead drop it, but now it's not obvious to users that
// we're reaching back into the main thread's stack.
assert(c != NULL);
coro_ref = luaL_ref(c->thread->L, LUA_REGISTRYINDEX);
resp = c->resp;
}
// TODO (v2): c only used for cache alloc? push the above into the func?
mcp_queue_io(c, resp, coro_ref, Lc);
// coroutine object sitting on the _main_ VM right now, so we grab
// the reference from there, which also pops it.
assert(c != NULL);
coro_ref = luaL_ref(c->thread->L, LUA_REGISTRYINDEX);
}

switch (yield_type) {
case MCP_YIELD_AWAIT:
mcplib_await_run(c, resp, Lc, coro_ref);
break;
case MCP_YIELD_POOL:
// TODO (v2): c only used for cache alloc?
mcp_queue_io(c, resp, coro_ref, Lc);
break;
default:
abort();
}

} else {
WSTAT_DECR(c, proxy_req_active, 1);
P_DEBUG("%s: Failed to run coroutine: %s\n", __func__, lua_tostring(Lc, -1));
Expand Down
5 changes: 5 additions & 0 deletions proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@
#define MCP_BACKEND_UPVALUE 3
#define MCP_CONTEXT_UPVALUE 4

#define MCP_YIELD_POOL 1
#define MCP_YIELD_AWAIT 2
#define MCP_YIELD_LOCAL 3

// all possible commands.
#define CMD_FIELDS \
X(CMD_MG) \
Expand Down Expand Up @@ -507,6 +511,7 @@ void mcp_request_attach(lua_State *L, mcp_request_t *rq, io_pending_proxy_t *p);
int mcp_request_render(mcp_request_t *rq, int idx, const char *tok, size_t len);
void proxy_lua_error(lua_State *L, const char *s);
void proxy_lua_ferror(lua_State *L, const char *fmt, ...);
void proxy_out_errstring(mc_resp *resp, const char *str);
int _start_proxy_config_threads(proxy_ctx_t *ctx);
int proxy_thread_loadconf(proxy_ctx_t *ctx, LIBEVENT_THREAD *thr);

Expand Down
3 changes: 2 additions & 1 deletion proxy_await.c
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ int mcplib_await(lua_State *L) {
aw->type = type;
P_DEBUG("%s: about to yield [len: %d]\n", __func__, n);

return lua_yield(L, 1);
lua_pushinteger(L, MCP_YIELD_AWAIT);
return lua_yield(L, 2);
}

static void mcp_queue_await_io(conn *c, lua_State *Lc, mcp_request_t *rq, int await_ref, bool await_first) {
Expand Down
3 changes: 2 additions & 1 deletion proxy_lua.c
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,8 @@ static int mcplib_pool_proxy_call(lua_State *L) {
rq->be = mcplib_pool_proxy_call_helper(L, p, key, len);

// now yield request, pool up.
return lua_yield(L, 2);
lua_pushinteger(L, MCP_YIELD_POOL);
return lua_yield(L, 3);
}

static int mcplib_tcp_keepalive(lua_State *L) {
Expand Down

0 comments on commit 6537cfe

Please sign in to comment.