Skip to content

Commit

Permalink
More pipe option handling, pipe API support. Url option.
Browse files Browse the repository at this point in the history
This fleshes most of the pipe API out, making it available to end user
code.  It also adds a URL option that is independent of the address
options (which would be sockaddrs.)

Also, we are now setting the pipe for req/rep.  The other protocols need
to have the same logic added to set the receive pipe on the message.  (Pair
is already done.)
  • Loading branch information
gdamore committed Sep 22, 2017
1 parent f04cfd2 commit e236dc8
Show file tree
Hide file tree
Showing 13 changed files with 197 additions and 76 deletions.
29 changes: 19 additions & 10 deletions src/core/endpt.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ struct nni_ep {
uint64_t ep_id; // endpoint id
nni_list_node ep_node; // per socket list
nni_sock * ep_sock;
char ep_addr[NNG_MAXADDRLEN];
char ep_url[NNG_MAXADDRLEN];
int ep_mode;
int ep_started;
int ep_closed; // full shutdown
Expand Down Expand Up @@ -82,6 +82,12 @@ nni_ep_id(nni_ep *ep)
return ((uint32_t) ep->ep_id);
}

const char *
nni_ep_url(nni_ep *ep)
{
return (ep->ep_url);
}

static void
nni_ep_destroy(nni_ep *ep)
{
Expand Down Expand Up @@ -117,16 +123,16 @@ nni_ep_destroy(nni_ep *ep)
}

static int
nni_ep_create(nni_ep **epp, nni_sock *s, const char *addr, int mode)
nni_ep_create(nni_ep **epp, nni_sock *s, const char *url, int mode)
{
nni_tran *tran;
nni_ep * ep;
int rv;

if ((tran = nni_tran_find(addr)) == NULL) {
if ((tran = nni_tran_find(url)) == NULL) {
return (NNG_ENOTSUP);
}
if (strlen(addr) >= NNG_MAXADDRLEN) {
if (strlen(url) >= NNG_MAXADDRLEN) {
return (NNG_EINVAL);
}

Expand All @@ -146,7 +152,7 @@ nni_ep_create(nni_ep **epp, nni_sock *s, const char *addr, int mode)
// dereference on hot paths.
ep->ep_ops = *tran->tran_ep;

(void) nni_strlcpy(ep->ep_addr, addr, sizeof(ep->ep_addr));
(void) nni_strlcpy(ep->ep_url, url, sizeof(ep->ep_url));

NNI_LIST_NODE_INIT(&ep->ep_node);

Expand All @@ -159,7 +165,7 @@ nni_ep_create(nni_ep **epp, nni_sock *s, const char *addr, int mode)
((rv = nni_aio_init(&ep->ep_con_aio, nni_ep_con_cb, ep)) != 0) ||
((rv = nni_aio_init(&ep->ep_tmo_aio, nni_ep_tmo_cb, ep)) != 0) ||
((rv = nni_aio_init(&ep->ep_con_syn, NULL, NULL)) != 0) ||
((rv = ep->ep_ops.ep_init(&ep->ep_data, addr, s, mode)) != 0) ||
((rv = ep->ep_ops.ep_init(&ep->ep_data, url, s, mode)) != 0) ||
((rv = nni_idhash_alloc(nni_eps, &ep->ep_id, ep)) != 0) ||
((rv = nni_sock_ep_add(s, ep)) != 0)) {
nni_ep_destroy(ep);
Expand All @@ -171,15 +177,15 @@ nni_ep_create(nni_ep **epp, nni_sock *s, const char *addr, int mode)
}

int
nni_ep_create_dialer(nni_ep **epp, nni_sock *s, const char *addr)
nni_ep_create_dialer(nni_ep **epp, nni_sock *s, const char *url)
{
return (nni_ep_create(epp, s, addr, NNI_EP_MODE_DIAL));
return (nni_ep_create(epp, s, url, NNI_EP_MODE_DIAL));
}

int
nni_ep_create_listener(nni_ep **epp, nni_sock *s, const char *addr)
nni_ep_create_listener(nni_ep **epp, nni_sock *s, const char *url)
{
return (nni_ep_create(epp, s, addr, NNI_EP_MODE_LISTEN));
return (nni_ep_create(epp, s, url, NNI_EP_MODE_LISTEN));
}

int
Expand Down Expand Up @@ -603,6 +609,9 @@ nni_ep_getopt(nni_ep *ep, int opt, void *valp, size_t *szp)
{
int rv;

if (opt == nng_optid_url) {
return (nni_getopt_str(ep->ep_url, valp, szp));
}
if (ep->ep_ops.ep_getopt == NULL) {
return (NNG_ENOTSUP);
}
Expand Down
41 changes: 21 additions & 20 deletions src/core/endpt.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,27 @@
#ifndef CORE_ENDPT_H
#define CORE_ENDPT_H

extern int nni_ep_sys_init(void);
extern void nni_ep_sys_fini(void);
extern nni_tran *nni_ep_tran(nni_ep *);
extern nni_sock *nni_ep_sock(nni_ep *);
extern int nni_ep_find(nni_ep **, uint32_t);
extern int nni_ep_hold(nni_ep *);
extern void nni_ep_rele(nni_ep *);
extern uint32_t nni_ep_id(nni_ep *);
extern int nni_ep_create_dialer(nni_ep **, nni_sock *, const char *);
extern int nni_ep_create_listener(nni_ep **, nni_sock *, const char *);
extern void nni_ep_stop(nni_ep *);
extern int nni_ep_shutdown(nni_ep *);
extern void nni_ep_close(nni_ep *);
extern int nni_ep_dial(nni_ep *, int);
extern int nni_ep_listen(nni_ep *, int);
extern void nni_ep_list_init(nni_list *);
extern int nni_ep_setopt(nni_ep *, int, const void *, size_t, int);
extern int nni_ep_getopt(nni_ep *, int, void *, size_t *);
extern int nni_ep_pipe_add(nni_ep *ep, nni_pipe *);
extern void nni_ep_pipe_remove(nni_ep *, nni_pipe *);
extern int nni_ep_sys_init(void);
extern void nni_ep_sys_fini(void);
extern nni_tran * nni_ep_tran(nni_ep *);
extern nni_sock * nni_ep_sock(nni_ep *);
extern int nni_ep_find(nni_ep **, uint32_t);
extern int nni_ep_hold(nni_ep *);
extern void nni_ep_rele(nni_ep *);
extern uint32_t nni_ep_id(nni_ep *);
extern int nni_ep_create_dialer(nni_ep **, nni_sock *, const char *);
extern int nni_ep_create_listener(nni_ep **, nni_sock *, const char *);
extern void nni_ep_stop(nni_ep *);
extern int nni_ep_shutdown(nni_ep *);
extern void nni_ep_close(nni_ep *);
extern int nni_ep_dial(nni_ep *, int);
extern int nni_ep_listen(nni_ep *, int);
extern void nni_ep_list_init(nni_list *);
extern int nni_ep_setopt(nni_ep *, int, const void *, size_t, int);
extern int nni_ep_getopt(nni_ep *, int, void *, size_t *);
extern int nni_ep_pipe_add(nni_ep *ep, nni_pipe *);
extern void nni_ep_pipe_remove(nni_ep *, nni_pipe *);
extern const char *nni_ep_url(nni_ep *);

// Endpoint modes. Currently used by transports. Remove this when we make
// transport dialers and listeners explicit.
Expand Down
14 changes: 14 additions & 0 deletions src/core/options.c
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,19 @@ nni_getopt_usec(nni_duration u, void *val, size_t *sizep)
return (0);
}

int
nni_getopt_sockaddr(const nng_sockaddr *sa, void *val, size_t *sizep)
{
size_t sz = sizeof(*sa);

if (sz > *sizep) {
sz = *sizep;
}
*sizep = sizeof(*sa);
memcpy(val, sa, sz);
return (0);
}

int
nni_getopt_int(int i, void *val, size_t *sizep)
{
Expand Down Expand Up @@ -403,6 +416,7 @@ nni_option_sys_init(void)
((rv = OPT_REGISTER(remaddr)) != 0) ||
((rv = OPT_REGISTER(recvfd)) != 0) ||
((rv = OPT_REGISTER(sendfd)) != 0) ||
((rv = OPT_REGISTER(url)) != 0) ||
((rv = OPT_REGISTER(req_resendtime)) != 0) ||
((rv = OPT_REGISTER(sub_subscribe)) != 0) ||
((rv = OPT_REGISTER(sub_unsubscribe)) != 0) ||
Expand Down
3 changes: 3 additions & 0 deletions src/core/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ extern int nni_getopt_u64(uint64_t, void *, size_t *);
// nni_getopt_str gets a C style string.
extern int nni_getopt_str(const char *, void *, size_t *);

// nni_getopt_sockaddr gets an nng_sockaddr.
extern int nni_getopt_sockaddr(const nng_sockaddr *, void *, size_t *);

// nni_setopt_size sets a size_t option.
extern int nni_setopt_size(size_t *, const void *, size_t, size_t, size_t);

Expand Down
64 changes: 53 additions & 11 deletions src/core/pipe.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@ struct nni_pipe {
int p_reap;
int p_stop;
int p_refcnt;
const char * p_url;
nni_mtx p_mtx;
nni_cv p_cv;
nni_list_node p_reap_node;
nni_aio * p_start_aio;
};

static nni_idhash *nni_pipes;
static nni_mtx nni_pipe_lk;

static nni_list nni_pipe_reap_list;
static nni_mtx nni_pipe_reap_lk;
Expand All @@ -49,6 +51,7 @@ nni_pipe_sys_init(void)
int rv;

NNI_LIST_INIT(&nni_pipe_reap_list, nni_pipe, p_reap_node);
nni_mtx_init(&nni_pipe_lk);
nni_mtx_init(&nni_pipe_reap_lk);
nni_cv_init(&nni_pipe_reap_cv, &nni_pipe_reap_lk);

Expand Down Expand Up @@ -85,6 +88,7 @@ nni_pipe_sys_fini(void)
nni_thr_fini(&nni_pipe_reap_thr);
nni_cv_fini(&nni_pipe_reap_cv);
nni_mtx_fini(&nni_pipe_reap_lk);
nni_mtx_fini(&nni_pipe_lk);
if (nni_pipes != NULL) {
nni_idhash_fini(nni_pipes);
nni_pipes = NULL;
Expand All @@ -103,11 +107,14 @@ nni_pipe_destroy(nni_pipe *p)

// Make sure any unlocked holders are done with this.
// This happens during initialization for example.
nni_mtx_lock(&p->p_mtx);
nni_mtx_lock(&nni_pipe_lk);
if (p->p_id != 0) {
nni_idhash_remove(nni_pipes, p->p_id);
}
while (p->p_refcnt != 0) {
nni_cv_wait(&p->p_cv);
}
nni_mtx_unlock(&p->p_mtx);
nni_mtx_unlock(&nni_pipe_lk);

// We have exclusive access at this point, so we can check if
// we are still on any lists.
Expand All @@ -124,13 +131,35 @@ nni_pipe_destroy(nni_pipe *p)
if (p->p_tran_data != NULL) {
p->p_tran_ops.p_fini(p->p_tran_data);
}
if (p->p_id != 0) {
nni_idhash_remove(nni_pipes, p->p_id);
}
nni_mtx_fini(&p->p_mtx);
NNI_FREE_STRUCT(p);
}

int
nni_pipe_find(nni_pipe **pp, uint32_t id)
{
int rv;
nni_pipe *p;
nni_mtx_lock(&nni_pipe_lk);
if ((rv = nni_idhash_find(nni_pipes, id, (void **) &p)) == 0) {
p->p_refcnt++;
*pp = p;
}
nni_mtx_unlock(&nni_pipe_lk);
return (rv);
}

void
nni_pipe_rele(nni_pipe *p)
{
nni_mtx_lock(&nni_pipe_lk);
p->p_refcnt--;
if (p->p_refcnt == 0) {
nni_cv_wake(&p->p_cv);
}
nni_mtx_unlock(&nni_pipe_lk);
}

// nni_pipe_id returns the 32-bit pipe id, which can be used in backtraces.
uint32_t
nni_pipe_id(nni_pipe *p)
Expand Down Expand Up @@ -238,16 +267,21 @@ nni_pipe_create(nni_ep *ep, void *tdata)
p->p_proto_data = NULL;
p->p_ep = ep;
p->p_sock = sock;
p->p_url = nni_ep_url(ep);

NNI_LIST_NODE_INIT(&p->p_reap_node);
NNI_LIST_NODE_INIT(&p->p_sock_node);
NNI_LIST_NODE_INIT(&p->p_ep_node);

nni_mtx_init(&p->p_mtx);
nni_cv_init(&p->p_cv, &p->p_mtx);
nni_cv_init(&p->p_cv, &nni_pipe_lk);
nni_aio_init(&p->p_start_aio, nni_pipe_start_cb, p);

if (((rv = nni_idhash_alloc(nni_pipes, &p->p_id, p)) != 0) ||
nni_mtx_lock(&nni_pipe_lk);
rv = nni_idhash_alloc(nni_pipes, &p->p_id, p);
nni_mtx_unlock(&nni_pipe_lk);

if ((rv != 0) ||
((rv = nni_ep_pipe_add(ep, p)) != 0) ||
((rv = nni_sock_pipe_add(sock, p)) != 0)) {
nni_pipe_destroy(p);
Expand All @@ -259,11 +293,19 @@ nni_pipe_create(nni_ep *ep, void *tdata)
int
nni_pipe_getopt(nni_pipe *p, int opt, void *val, size_t *szp)
{
/* This should only be called with the mutex held... */
if (p->p_tran_ops.p_getopt == NULL) {
return (NNG_ENOTSUP);
int rv = NNG_ENOTSUP;

if (opt == nng_optid_url) {
return (nni_getopt_str(p->p_url, val, szp));
}
return (p->p_tran_ops.p_getopt(p->p_tran_data, opt, val, szp));
if (p->p_tran_ops.p_getopt != NULL) {
rv = p->p_tran_ops.p_getopt(p->p_tran_data, opt, val, szp);
}
if (rv == NNG_ENOTSUP) {
// Maybe its a generic socket option?
rv = nni_sock_getopt(p->p_sock, opt, val, szp);
}
return (rv);
}

void
Expand Down
9 changes: 8 additions & 1 deletion src/core/pipe.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ extern void nni_pipe_start(nni_pipe *);

extern uint16_t nni_pipe_proto(nni_pipe *);
extern uint16_t nni_pipe_peer(nni_pipe *);
extern int nni_pipe_getopt(nni_pipe *, int, void *, size_t *sizep);
extern int nni_pipe_getopt(nni_pipe *, int, void *, size_t *sizep);

// nni_pipe_get_proto_data gets the protocol private data set with the
// nni_pipe_set_proto_data function. No locking is performed.
Expand All @@ -72,4 +72,11 @@ extern void nni_pipe_sock_list_init(nni_list *);
// a per-endpoint list.
extern void nni_pipe_ep_list_init(nni_list *);

// nni_pipe_find finds a pipe given its ID. It places a hold on the
// pipe, which must be released by the caller when it is done.
extern int nni_pipe_find(nni_pipe **, uint32_t);

// nni_pipe_rele releases the hold on the pipe placed by nni_pipe_find.
extern void nni_pipe_rele(nni_pipe *);

#endif // CORE_PIPE_H
30 changes: 18 additions & 12 deletions src/nng.c
Original file line number Diff line number Diff line change
Expand Up @@ -663,30 +663,34 @@ nng_strerror(int num)
return ("Unknown error");
}

#if 0
int
nng_pipe_getopt(nng_pipe *pipe, int opt, void *val, size_t *sizep)
nng_pipe_getopt(nng_pipe id, int opt, void *val, size_t *sizep)
{
int rv;
int rv;
nni_pipe *p;

rv = nni_pipe_getopt(pipe, opt, val, sizep);
if (rv == ENOTSUP) {
// Maybe its a generic socket option.
rv = nni_sock_getopt(pipe->p_sock, opt, val, sizep);
if ((rv = nni_pipe_find(&p, id)) != 0) {
return (rv);
}
rv = nni_pipe_getopt(p, opt, val, sizep);
nni_pipe_rele(p);
return (rv);
}


int
nng_pipe_close(nng_pipe *pipe)
nng_pipe_close(nng_pipe id)
{
nni_pipe_close(pipe);
int rv;
nni_pipe *p;

if ((rv = nni_pipe_find(&p, id)) != 0) {
return (rv);
}
nni_pipe_close(p);
nni_pipe_rele(p);
return (0);
}

#endif

// Message handling.
int
nng_msg_alloc(nng_msg **msgp, size_t size)
Expand Down Expand Up @@ -1012,6 +1016,7 @@ const char *nng_opt_recvfd = "recv-fd";
const char *nng_opt_sendfd = "send-fd";
const char *nng_opt_locaddr = "local-address";
const char *nng_opt_remaddr = "remote-address";
const char *nng_opt_url = "url";
// Well known protocol options.
const char *nng_opt_req_resendtime = "req:resend-time";
const char *nng_opt_sub_subscribe = "sub:subscribe";
Expand All @@ -1034,6 +1039,7 @@ int nng_optid_recvfd;
int nng_optid_sendfd;
int nng_optid_locaddr;
int nng_optid_remaddr;
int nng_optid_url;
int nng_optid_req_resendtime;
int nng_optid_sub_subscribe;
int nng_optid_sub_unsubscribe;
Expand Down
Loading

0 comments on commit e236dc8

Please sign in to comment.