Skip to content

Commit

Permalink
Merge pull request grpc#10897 from ctiller/serve_fries
Browse files Browse the repository at this point in the history
Switch default CQ count for thread manager to num_cpus
  • Loading branch information
ctiller authored May 16, 2017
2 parents 8271363 + 3cf000f commit bcb65f0
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 87 deletions.
2 changes: 1 addition & 1 deletion include/grpc++/server_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ class ServerBuilder {

struct SyncServerSettings {
SyncServerSettings()
: num_cqs(gpr_cpu_num_cores()),
: num_cqs(GPR_MAX(1, gpr_cpu_num_cores())),
min_pollers(1),
max_pollers(2),
cq_timeout_msec(10000) {}
Expand Down
25 changes: 24 additions & 1 deletion src/core/lib/support/mpscq.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,12 @@ void gpr_mpscq_destroy(gpr_mpscq *q) {
GPR_ASSERT(q->tail == &q->stub);
}

void gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n) {
bool gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n) {
gpr_atm_no_barrier_store(&n->next, (gpr_atm)NULL);
gpr_mpscq_node *prev =
(gpr_mpscq_node *)gpr_atm_full_xchg(&q->head, (gpr_atm)n);
gpr_atm_rel_store(&prev->next, (gpr_atm)n);
return prev == &q->stub;
}

gpr_mpscq_node *gpr_mpscq_pop(gpr_mpscq *q) {
Expand Down Expand Up @@ -92,3 +93,25 @@ gpr_mpscq_node *gpr_mpscq_pop_and_check_end(gpr_mpscq *q, bool *empty) {
*empty = false;
return NULL;
}

void gpr_locked_mpscq_init(gpr_locked_mpscq *q) {
gpr_mpscq_init(&q->queue);
q->read_lock = GPR_SPINLOCK_INITIALIZER;
}

void gpr_locked_mpscq_destroy(gpr_locked_mpscq *q) {
gpr_mpscq_destroy(&q->queue);
}

bool gpr_locked_mpscq_push(gpr_locked_mpscq *q, gpr_mpscq_node *n) {
return gpr_mpscq_push(&q->queue, n);
}

gpr_mpscq_node *gpr_locked_mpscq_pop(gpr_locked_mpscq *q) {
if (gpr_spinlock_trylock(&q->read_lock)) {
gpr_mpscq_node *n = gpr_mpscq_pop(&q->queue);
gpr_spinlock_unlock(&q->read_lock);
return n;
}
return NULL;
}
27 changes: 25 additions & 2 deletions src/core/lib/support/mpscq.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include <grpc/support/atm.h>
#include <stdbool.h>
#include <stddef.h>
#include "src/core/lib/support/spinlock.h"

// Multiple-producer single-consumer lock free queue, based upon the
// implementation from Dmitry Vyukov here:
Expand All @@ -58,12 +59,34 @@ typedef struct gpr_mpscq {
void gpr_mpscq_init(gpr_mpscq *q);
void gpr_mpscq_destroy(gpr_mpscq *q);
// Push a node
void gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n);
// Thread safe - can be called from multiple threads concurrently
// Returns true if this was possibly the first node (may return true
// sporadically, will not return false sporadically)
bool gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n);
// Pop a node (returns NULL if no node is ready - which doesn't indicate that
// the queue is empty!!)
// Thread compatible - can only be called from one thread at a time
gpr_mpscq_node *gpr_mpscq_pop(gpr_mpscq *q);

// Pop a node; sets *empty to true if the queue is empty, or false if it is not
gpr_mpscq_node *gpr_mpscq_pop_and_check_end(gpr_mpscq *q, bool *empty);

// An mpscq with a spinlock: it's safe to pop from multiple threads, but doing
// only one thread will succeed concurrently
typedef struct gpr_locked_mpscq {
gpr_mpscq queue;
gpr_spinlock read_lock;
} gpr_locked_mpscq;

void gpr_locked_mpscq_init(gpr_locked_mpscq *q);
void gpr_locked_mpscq_destroy(gpr_locked_mpscq *q);
// Push a node
// Thread safe - can be called from multiple threads concurrently
// Returns true if this was possibly the first node (may return true
// sporadically, will not return false sporadically)
bool gpr_locked_mpscq_push(gpr_locked_mpscq *q, gpr_mpscq_node *n);
// Pop a node (returns NULL if no node is ready - which doesn't indicate that
// the queue is empty!!)
// Thread safe - can be called from multiple threads concurrently
gpr_mpscq_node *gpr_locked_mpscq_pop(gpr_locked_mpscq *q);

#endif /* GRPC_CORE_LIB_SUPPORT_MPSCQ_H */
112 changes: 29 additions & 83 deletions src/core/lib/surface/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/support/stack_lockfree.h"
#include "src/core/lib/support/mpscq.h"
#include "src/core/lib/support/spinlock.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/call.h"
Expand Down Expand Up @@ -76,6 +77,7 @@ typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type;
grpc_tracer_flag grpc_server_channel_trace = GRPC_TRACER_INITIALIZER(false);

typedef struct requested_call {
gpr_mpscq_node request_link; /* must be first */
requested_call_type type;
size_t cq_idx;
void *tag;
Expand Down Expand Up @@ -175,7 +177,7 @@ struct request_matcher {
grpc_server *server;
call_data *pending_head;
call_data *pending_tail;
gpr_stack_lockfree **requests_per_cq;
gpr_locked_mpscq *requests_per_cq;
};

struct registered_method {
Expand Down Expand Up @@ -220,11 +222,6 @@ struct grpc_server {
registered_method *registered_methods;
/** one request matcher for unregistered methods */
request_matcher unregistered_request_matcher;
/** free list of available requested_calls_per_cq indices */
gpr_stack_lockfree **request_freelist_per_cq;
/** requested call backing data */
requested_call **requested_calls_per_cq;
int max_requested_calls_per_cq;

gpr_atm shutdown_flag;
uint8_t shutdown_published;
Expand Down Expand Up @@ -324,21 +321,20 @@ static void channel_broadcaster_shutdown(grpc_exec_ctx *exec_ctx,
* request_matcher
*/

static void request_matcher_init(request_matcher *rm, size_t entries,
grpc_server *server) {
static void request_matcher_init(request_matcher *rm, grpc_server *server) {
memset(rm, 0, sizeof(*rm));
rm->server = server;
rm->requests_per_cq =
gpr_malloc(sizeof(*rm->requests_per_cq) * server->cq_count);
for (size_t i = 0; i < server->cq_count; i++) {
rm->requests_per_cq[i] = gpr_stack_lockfree_create(entries);
gpr_locked_mpscq_init(&rm->requests_per_cq[i]);
}
}

static void request_matcher_destroy(request_matcher *rm) {
for (size_t i = 0; i < rm->server->cq_count; i++) {
GPR_ASSERT(gpr_stack_lockfree_pop(rm->requests_per_cq[i]) == -1);
gpr_stack_lockfree_destroy(rm->requests_per_cq[i]);
GPR_ASSERT(gpr_locked_mpscq_pop(&rm->requests_per_cq[i]) == NULL);
gpr_locked_mpscq_destroy(&rm->requests_per_cq[i]);
}
gpr_free(rm->requests_per_cq);
}
Expand Down Expand Up @@ -368,13 +364,17 @@ static void request_matcher_kill_requests(grpc_exec_ctx *exec_ctx,
grpc_server *server,
request_matcher *rm,
grpc_error *error) {
int request_id;
requested_call *rc;
for (size_t i = 0; i < server->cq_count; i++) {
while ((request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[i])) !=
-1) {
fail_call(exec_ctx, server, i,
&server->requested_calls_per_cq[i][request_id],
GRPC_ERROR_REF(error));
/* Here we know:
1. no requests are being added (since the server is shut down)
2. no other threads are pulling (since the shut down process is single
threaded)
So, we can ignore the queue lock and just pop, with the guarantee that a
NULL returned here truly means that the queue is empty */
while ((rc = (requested_call *)gpr_mpscq_pop(
&rm->requests_per_cq[i].queue)) != NULL) {
fail_call(exec_ctx, server, i, rc, GRPC_ERROR_REF(error));
}
}
GRPC_ERROR_UNREF(error);
Expand Down Expand Up @@ -409,13 +409,7 @@ static void server_delete(grpc_exec_ctx *exec_ctx, grpc_server *server) {
}
for (i = 0; i < server->cq_count; i++) {
GRPC_CQ_INTERNAL_UNREF(exec_ctx, server->cqs[i], "server");
if (server->started) {
gpr_stack_lockfree_destroy(server->request_freelist_per_cq[i]);
gpr_free(server->requested_calls_per_cq[i]);
}
}
gpr_free(server->request_freelist_per_cq);
gpr_free(server->requested_calls_per_cq);
gpr_free(server->cqs);
gpr_free(server->pollsets);
gpr_free(server->shutdown_tags);
Expand Down Expand Up @@ -473,21 +467,7 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand,

static void done_request_event(grpc_exec_ctx *exec_ctx, void *req,
grpc_cq_completion *c) {
requested_call *rc = req;
grpc_server *server = rc->server;

if (rc >= server->requested_calls_per_cq[rc->cq_idx] &&
rc < server->requested_calls_per_cq[rc->cq_idx] +
server->max_requested_calls_per_cq) {
GPR_ASSERT(rc - server->requested_calls_per_cq[rc->cq_idx] <= INT_MAX);
gpr_stack_lockfree_push(
server->request_freelist_per_cq[rc->cq_idx],
(int)(rc - server->requested_calls_per_cq[rc->cq_idx]));
} else {
gpr_free(req);
}

server_unref(exec_ctx, server);
gpr_free(req);
}

static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
Expand Down Expand Up @@ -516,10 +496,6 @@ static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
GPR_UNREACHABLE_CODE(return );
}

grpc_call_element *elem =
grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
channel_data *chand = elem->channel_data;
server_ref(chand->server);
grpc_cq_end_op(exec_ctx, calld->cq_new, rc->tag, GRPC_ERROR_NONE,
done_request_event, rc, &rc->completion);
}
Expand Down Expand Up @@ -547,15 +523,15 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg,

for (size_t i = 0; i < server->cq_count; i++) {
size_t cq_idx = (chand->cq_idx + i) % server->cq_count;
int request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[cq_idx]);
if (request_id == -1) {
requested_call *rc =
(requested_call *)gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx]);
if (rc == NULL) {
continue;
} else {
gpr_mu_lock(&calld->mu_state);
calld->state = ACTIVATED;
gpr_mu_unlock(&calld->mu_state);
publish_call(exec_ctx, server, calld, cq_idx,
&server->requested_calls_per_cq[cq_idx][request_id]);
publish_call(exec_ctx, server, calld, cq_idx, rc);
return; /* early out */
}
}
Expand Down Expand Up @@ -1029,8 +1005,6 @@ grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) {
server->root_channel_data.next = server->root_channel_data.prev =
&server->root_channel_data;

/* TODO(ctiller): expose a channel_arg for this */
server->max_requested_calls_per_cq = 32768;
server->channel_args = grpc_channel_args_copy(args);

return server;
Expand Down Expand Up @@ -1103,29 +1077,15 @@ void grpc_server_start(grpc_server *server) {
server->started = true;
server->pollset_count = 0;
server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count);
server->request_freelist_per_cq =
gpr_malloc(sizeof(*server->request_freelist_per_cq) * server->cq_count);
server->requested_calls_per_cq =
gpr_malloc(sizeof(*server->requested_calls_per_cq) * server->cq_count);
for (i = 0; i < server->cq_count; i++) {
if (grpc_cq_can_listen(server->cqs[i])) {
server->pollsets[server->pollset_count++] =
grpc_cq_pollset(server->cqs[i]);
}
server->request_freelist_per_cq[i] =
gpr_stack_lockfree_create((size_t)server->max_requested_calls_per_cq);
for (int j = 0; j < server->max_requested_calls_per_cq; j++) {
gpr_stack_lockfree_push(server->request_freelist_per_cq[i], j);
}
server->requested_calls_per_cq[i] =
gpr_malloc((size_t)server->max_requested_calls_per_cq *
sizeof(*server->requested_calls_per_cq[i]));
}
request_matcher_init(&server->unregistered_request_matcher,
(size_t)server->max_requested_calls_per_cq, server);
request_matcher_init(&server->unregistered_request_matcher, server);
for (registered_method *rm = server->registered_methods; rm; rm = rm->next) {
request_matcher_init(&rm->request_matcher,
(size_t)server->max_requested_calls_per_cq, server);
request_matcher_init(&rm->request_matcher, server);
}

server_ref(server);
Expand Down Expand Up @@ -1379,21 +1339,11 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx,
requested_call *rc) {
call_data *calld = NULL;
request_matcher *rm = NULL;
int request_id;
if (gpr_atm_acq_load(&server->shutdown_flag)) {
fail_call(exec_ctx, server, cq_idx, rc,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown"));
return GRPC_CALL_OK;
}
request_id = gpr_stack_lockfree_pop(server->request_freelist_per_cq[cq_idx]);
if (request_id == -1) {
/* out of request ids: just fail this one */
fail_call(exec_ctx, server, cq_idx, rc,
grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Out of request ids"),
GRPC_ERROR_INT_LIMIT, server->max_requested_calls_per_cq));
return GRPC_CALL_OK;
}
switch (rc->type) {
case BATCH_CALL:
rm = &server->unregistered_request_matcher;
Expand All @@ -1402,15 +1352,13 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx,
rm = &rc->data.registered.registered_method->request_matcher;
break;
}
server->requested_calls_per_cq[cq_idx][request_id] = *rc;
gpr_free(rc);
if (gpr_stack_lockfree_push(rm->requests_per_cq[cq_idx], request_id)) {
if (gpr_locked_mpscq_push(&rm->requests_per_cq[cq_idx], &rc->request_link)) {
/* this was the first queued request: we need to lock and start
matching calls */
gpr_mu_lock(&server->mu_call);
while ((calld = rm->pending_head) != NULL) {
request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[cq_idx]);
if (request_id == -1) break;
rc = (requested_call *)gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx]);
if (rc == NULL) break;
rm->pending_head = calld->pending_next;
gpr_mu_unlock(&server->mu_call);
gpr_mu_lock(&calld->mu_state);
Expand All @@ -1426,8 +1374,7 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(calld->state == PENDING);
calld->state = ACTIVATED;
gpr_mu_unlock(&calld->mu_state);
publish_call(exec_ctx, server, calld, cq_idx,
&server->requested_calls_per_cq[cq_idx][request_id]);
publish_call(exec_ctx, server, calld, cq_idx, rc);
}
gpr_mu_lock(&server->mu_call);
}
Expand Down Expand Up @@ -1534,7 +1481,6 @@ static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
rc->initial_metadata->count = 0;
GPR_ASSERT(error != GRPC_ERROR_NONE);

server_ref(server);
grpc_cq_end_op(exec_ctx, server->cqs[cq_idx], rc->tag, error,
done_request_event, rc, &rc->completion);
}
Expand Down

0 comments on commit bcb65f0

Please sign in to comment.