Skip to content

Commit

Permalink
Use contexts for new async demo.
Browse files Browse the repository at this point in the history
  • Loading branch information
gdamore committed Apr 12, 2018
1 parent ac917f6 commit 7e4fa0f
Show file tree
Hide file tree
Showing 6 changed files with 376 additions and 29 deletions.
50 changes: 21 additions & 29 deletions demo/async/README.adoc
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
= async

This is a simple asynchronous demo, that demonstrates use of the RAW
option with a server, along with async message handling, to obtain a
very high level of asynchronous operation, suitable for use in a highly
concurrent server application.
This is a simple asynchronous demo, that demonstrates use of the contexts
and asynchronous message handling and operations, to obtain highly concurrent
processing with minimal fuss.

== Compiling

You can override the level of concurrency with the `PARALLEL`
define. This determines how many requests the server will accept
at a time, and keep outstanding. Note that for our toy
implementation, we create this many "logical" flows of execution
(these are _NOT_ threads), where a request is followed by a reply.
at a time, and keep outstanding. Note that for our toy implementation,
we create this many "logical" flows of execution (contexts) (these are
_NOT_ threads), where a request is followed by a reply.

The value of `PARALLEL` must be at least one, and may be as large
as your memory will permit. (The default value is 32.)
as your memory will permit. (The default value is 128.) Probably
you want the value to be small enough to ensure that you have enough
file descriptors. (You can create more contexts than this, but generally
you can't have more than one client per descriptor. Contexts can be used
on the client side to support many thousands of concurrent requests over
even just a single TCP connection, however.)

On UNIX-style systems:

Expand All @@ -23,31 +27,19 @@ On UNIX-style systems:
% export CPPFLAGS="-D PARALLEL=32 -I /usr/local/include"
% export LDFLAGS="-L /usr/local/lib -lnng"
% export CC="cc"
% ${CC} ${CPPFLAGS} async.c -o async ${LDFLAGS}
% ${CC} ${CPPFLAGS} server.c -o server ${LDFLAGS}
% ${CC} ${CPPFLAGS} client.c -o client ${LDFLAGS}
----

== Running

To run the server, use the arguments `__url__ -s`.
The easiest thing is to simply use the `run.sh` script, which
sends COUNT (10) random jobs to the server in parallel.

To run the client, use the arguments `__url__ __msec__`.
You can of course run the client and server manually instead.

The _msec_ is a "delay" time that server will wait before responding.
We have these delays so simulate long running work.
The server takes the address (url) as its only argument.

In the following example, all of the clients should complete within
2 seconds. (Assuming `PARALLEL` is defined to be large enough.)

[source,bash]
----
% export URL="tcp://127.0.0.1:55995"
# start the server
% ./async $URL -s &
# start a bunch of clients
# Note that these all run concurrently!
% ./async $URL 2 &
% ./async $URL 2 &
% ./async $URL 2 &
% ./async $URL 2 &
% ./async $URL 2 &
----
The client takes the address (url), followed by the number of
milliseconds the server should "wait" before responding (to simulate
an expensive operation.)
96 changes: 96 additions & 0 deletions demo/async/client.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Copyright 2018 Staysail Systems, Inc. <[email protected]>
// Copyright 2018 Capitar IT Group BV <[email protected]>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
// file was obtained (LICENSE.txt). A copy of the license may also be
// found online at https://opensource.org/licenses/MIT.
//

// This program is just a simple client application for our demo server.
// It is in a separate file to keep the server code clearer to understand.
//
// Our demonstration application layer protocol is simple. The client sends
// a number of milliseconds to wait before responding. The server just gives
// back an empty reply after waiting that long.

// For example:
//
// % ./server tcp://127.0.0.1:5555 &
// % ./client tcp://127.0.0.1:5555 323
// Request took 324 milliseconds.

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/time.h>
#include <time.h>

#include <nng/nng.h>
#include <nng/protocol/reqrep0/req.h>
#include <nng/supplemental/util/platform.h>

void
fatal(const char *func, int rv)
{
fprintf(stderr, "%s: %s\n", func, nng_strerror(rv));
exit(1);
}

/* The client runs just once, and then returns. */
int
client(const char *url, const char *msecstr)
{
nng_socket sock;
int rv;
nng_msg * msg;
nng_time start;
nng_time end;
unsigned msec;

msec = atoi(msecstr);

if ((rv = nng_req0_open(&sock)) != 0) {
fatal("nng_req0_open", rv);
}

if ((rv = nng_dial(sock, url, NULL, 0)) < 0) {
fatal("nng_dial", rv);
}

start = nng_clock();

if ((rv = nng_msg_alloc(&msg, 0)) != 0) {
fatal("nng_msg_alloc", rv);
}
if ((rv = nng_msg_append_u32(msg, msec)) != 0) {
fatal("nng_msg_append_u32", rv);
}

if ((rv = nng_sendmsg(sock, msg, 0)) != 0) {
fatal("nng_send", rv);
}

if ((rv = nng_recvmsg(sock, &msg, 0)) != 0) {
fatal("nng_recvmsg", rv);
}
end = nng_clock();
nng_msg_free(msg);
nng_close(sock);

printf("Request took %u milliseconds.\n", (uint32_t)(end - start));
return (0);
}

int
main(int argc, char **argv)
{
int rc;

if (argc != 3) {
fprintf(stderr, "Usage: %s <url> <secs>\n", argv[0]);
exit(EXIT_FAILURE);
}
rc = client(argv[1], argv[2]);
exit(rc == 0 ? EXIT_SUCCESS : EXIT_FAILURE);
}
27 changes: 27 additions & 0 deletions demo/async/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#!/bin/bash

ADDR=ipc:///tmp/async_demo
COUNT=10

./server $ADDR &
SERVER_PID=$!
trap "kill $SERVER_PID" 0
typeset -a CLIENT_PID
i=0
sleep 1
while (( i < COUNT ))
do
i=$(( i + 1 ))
rnd=$(( RANDOM % 1000 + 500 ))
echo "Starting client $i: server replies after $rnd msec"
./client $ADDR $rnd &
eval CLIENT_PID[$i]=$!
done

i=0
while (( i < COUNT ))
do
i=$(( i + 1 ))
wait ${CLIENT_PID[$i]}
done
kill $SERVER_PID
179 changes: 179 additions & 0 deletions demo/async/server.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
// Copyright 2018 Staysail Systems, Inc. <[email protected]>
// Copyright 2018 Capitar IT Group BV <[email protected]>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
// file was obtained (LICENSE.txt). A copy of the license may also be
// found online at https://opensource.org/licenses/MIT.
//

// This program serves as an example for how to write an async RPC service,
// using the request/reply pattern and contexts (nng_ctx(5)). The server
// allocates a number of contexts up front, which determines the amount of
// parallelism possible. The callbacks are handled asynchronously, so
// this could be done by threads, or something similar. For our uses we
// make use of an event driven architecture that we already have available.

// Our demonstration application layer protocol is simple. The client sends
// a number of milliseconds to wait before responding. The server just gives
// back an empty reply after waiting that long.

// To run this program, start the server as async_demo <url> -s
// Then connect to it with the client as async_client <url> <msec>.
//
// For example:
//
// % ./server tcp://127.0.0.1:5555 &
// % ./client tcp://127.0.0.1:5555 323
// Request took 324 milliseconds.

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/time.h>
#include <time.h>

#include <nng/nng.h>
#include <nng/protocol/reqrep0/rep.h>
#include <nng/supplemental/util/platform.h>

// Parallel is the maximum number of outstanding requests we can handle.
// This is *NOT* the number of threads in use, but instead represents
// outstanding work items. Select a small number to reduce memory size.
// (Each one of these can be thought of as a request-reply loop.) Note
// that you will probably run into limitations on the number of open file
// descriptors if you set this too high. (If not for that limit, this could
// be set in the thousands, each context consumes a couple of KB.)
#ifndef PARALLEL
#define PARALLEL 128
#endif

// The server keeps a list of work items, sorted by expiration time,
// so that we can use this to set the timeout to the correct value for
// use in poll.
struct work {
enum { INIT, RECV, WAIT, SEND } state;
nng_aio *aio;
nng_msg *msg;
nng_ctx ctx;
};

void
fatal(const char *func, int rv)
{
fprintf(stderr, "%s: %s\n", func, nng_strerror(rv));
exit(1);
}

void
server_cb(void *arg)
{
struct work *work = arg;
nng_msg * msg;
int rv;
uint32_t when;

switch (work->state) {
case INIT:
work->state = RECV;
nng_ctx_recv(work->ctx, work->aio);
break;
case RECV:
if ((rv = nng_aio_result(work->aio)) != 0) {
fatal("nng_ctx_recv", rv);
}
msg = nng_aio_get_msg(work->aio);
if ((rv = nng_msg_trim_u32(msg, &when)) != 0) {
// bad message, just ignore it.
nng_msg_free(msg);
nng_ctx_recv(work->ctx, work->aio);
return;
}
work->msg = msg;
work->state = WAIT;
nng_sleep_aio(when, work->aio);
break;
case WAIT:
// We could add more data to the message here.
nng_aio_set_msg(work->aio, work->msg);
work->msg = NULL;
work->state = SEND;
nng_ctx_send(work->ctx, work->aio);
break;
case SEND:
if ((rv = nng_aio_result(work->aio)) != 0) {
nng_msg_free(work->msg);
fatal("nng_ctx_send", rv);
}
work->state = RECV;
nng_ctx_recv(work->ctx, work->aio);
break;
default:
fatal("bad state!", NNG_ESTATE);
break;
}
}

struct work *
alloc_work(nng_socket sock)
{
struct work *w;
int rv;

if ((w = nng_alloc(sizeof(*w))) == NULL) {
fatal("nng_alloc", NNG_ENOMEM);
}
if ((rv = nng_aio_alloc(&w->aio, server_cb, w)) != 0) {
fatal("nng_aio_alloc", rv);
}
if ((rv = nng_ctx_open(&w->ctx, sock)) != 0) {
fatal("nng_ctx_open", rv);
}
w->state = INIT;
return (w);
}

// The server runs forever.
int
server(const char *url)
{
nng_socket sock;
struct work *works[PARALLEL];
int rv;
int i;

/* Create the socket. */
rv = nng_rep0_open(&sock);
if (rv != 0) {
fatal("nng_rep0_open", rv);
}

for (i = 0; i < PARALLEL; i++) {
works[i] = alloc_work(sock);
}

if ((rv = nng_listen(sock, url, NULL, 0)) != 0) {
fatal("nng_listen", rv);
}

for (i = 0; i < PARALLEL; i++) {
server_cb(works[i]); // this starts them going (INIT state)
}

for (;;) {
nng_msleep(3600000); // neither pause() nor sleep() portable
}
}

int
main(int argc, char **argv)
{
int rc;

if (argc != 2) {
fprintf(stderr, "Usage: %s <url>\n", argv[0]);
exit(EXIT_FAILURE);
}
rc = server(argv[1]);
exit(rc == 0 ? EXIT_SUCCESS : EXIT_FAILURE);
}
Loading

0 comments on commit 7e4fa0f

Please sign in to comment.