Skip to content

Commit

Permalink
Merge pull request nats-io#352 from nats-io/simplify_init_resp_mux
Browse files Browse the repository at this point in the history
Simplify creation of connection response mux
  • Loading branch information
kozlovic authored Jul 28, 2020
2 parents d3c1413 + 6662bb7 commit b151568
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 93 deletions.
84 changes: 24 additions & 60 deletions src/conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,6 @@ _freeConn(natsConnection *nc)
natsConn_destroyRespPool(nc);
natsInbox_Destroy(nc->respSub);
natsStrHash_Destroy(nc->respMap);
natsCondition_Destroy(nc->respReady);
natsCondition_Destroy(nc->reconnectCond);
natsMutex_Destroy(nc->subsMu);
natsTimer_Destroy(nc->drainTimer);
Expand Down Expand Up @@ -1323,72 +1322,34 @@ natsConn_addRespInfo(respInfo **newResp, natsConnection *nc, char *respInbox, in
// Initialize some of the connection's fields used for request/reply mapping.
// Connection's lock is held on entry.
natsStatus
natsConn_initResp(natsConnection *nc, char *ginbox, int ginboxSize)
natsConn_initResp(natsConnection *nc, natsMsgHandler cb)
{
natsStatus s = NATS_OK;
char ginbox[NATS_INBOX_PRE_LEN + NUID_BUFFER_LEN + 1 + 1 + 1]; // _INBOX.<nuid>.*

nc->respPool = NATS_CALLOC(RESP_INFO_POOL_MAX_SIZE, sizeof(respInfo*));
if (nc->respPool == NULL)
s = nats_setDefaultError(NATS_NO_MEMORY);
if (s == NATS_OK)
s = natsCondition_Create(&nc->respReady);
if (s == NATS_OK)
s = natsStrHash_Create(&nc->respMap, 4);
if (s == NATS_OK)
s = natsInbox_Create(&nc->respSub);
if (s == NATS_OK)
snprintf(ginbox, ginboxSize, "%s.*", nc->respSub);

return NATS_UPDATE_ERR_STACK(s);
}

natsStatus
natsConn_createRespMux(natsConnection *nc, char *ginbox, natsMsgHandler cb)
{
natsStatus s = NATS_OK;
natsSubscription *sub = NULL;

s = natsConn_subscribeNoPool(&sub, nc, ginbox, cb, (void*) nc);
if (s == NATS_OK)
{
// Between a successful creation of the subscription and
// the time we get the connection lock, the connection could
// have been closed. If that is the case, we need to
// release the subscription, otherwise keep track of it.
natsConn_Lock(nc);
if (natsConn_isClosed(nc))
{
natsSub_release(sub);
s = NATS_CONNECTION_CLOSED;
}
else
{
nc->respMux = sub;
}
// Signal possible threads waiting for the subscription
// to be ready.
natsCondition_Broadcast(nc->respReady);
natsConn_Unlock(nc);
snprintf(ginbox, sizeof(ginbox), "%s.*", nc->respSub);
s = natsConn_subscribeNoPoolNoLock(&(nc->respMux), nc, ginbox, cb, (void*) nc);
}
if (s != NATS_OK)
{
natsInbox_Destroy(nc->respSub);
nc->respSub = NULL;
natsStrHash_Destroy(nc->respMap);
nc->respMap = NULL;
NATS_FREE(nc->respPool);
nc->respPool = NULL;
}
return s;
}

natsStatus
natsConn_waitForRespMux(natsConnection *nc)
{
natsStatus s = NATS_OK;

natsConn_Lock(nc);

while (!natsConn_isClosed(nc) && (nc->respMux == NULL))
natsCondition_Wait(nc->respReady, nc->mu);

if (natsConn_isClosed(nc))
s = NATS_CONNECTION_CLOSED;

natsConn_Unlock(nc);

return s;
return NATS_UPDATE_ERR_STACK(s);
}

// This will clear any pending Request calls.
Expand Down Expand Up @@ -2829,11 +2790,10 @@ _badQueue(const char *queue)
return _checkSubjOrQueue(queue, false);
}

// subscribe is the internal subscribe function that indicates interest in a
// subject.
// subscribe is the internal subscribe function that indicates interest in a subject.
natsStatus
natsConn_subscribeImpl(natsSubscription **newSub,
natsConnection *nc, const char *subj, const char *queue,
natsConnection *nc, bool lock, const char *subj, const char *queue,
int64_t timeout, natsMsgHandler cb, void *cbClosure,
bool preventUseOfLibDlvPool)
{
Expand All @@ -2849,18 +2809,21 @@ natsConn_subscribeImpl(natsSubscription **newSub,
if ((queue != NULL) && ((strlen(subj) == 0) || _badQueue(queue)))
return nats_setDefaultError(NATS_INVALID_QUEUE_NAME);

natsConn_Lock(nc);
if (lock)
natsConn_Lock(nc);

if (natsConn_isClosed(nc))
{
natsConn_Unlock(nc);
if (lock)
natsConn_Unlock(nc);

return nats_setDefaultError(NATS_CONNECTION_CLOSED);
}

if (natsConn_isDraining(nc))
{
natsConn_Unlock(nc);
if (lock)
natsConn_Unlock(nc);

return nats_setDefaultError(NATS_DRAINING);
}
Expand Down Expand Up @@ -2926,7 +2889,8 @@ natsConn_subscribeImpl(natsSubscription **newSub,
natsSub_release(sub);
}

natsConn_Unlock(nc);
if (lock)
natsConn_Unlock(nc);

return NATS_UPDATE_ERR_STACK(s);
}
Expand Down
17 changes: 6 additions & 11 deletions src/conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,18 +73,19 @@ natsConn_processPing(natsConnection *nc);
void
natsConn_processPong(natsConnection *nc);

#define natsConn_subscribeNoPool(sub, nc, subj, cb, closure) natsConn_subscribeImpl((sub), (nc), (subj), NULL, 0, (cb), (closure), true)
#define natsConn_subscribeNoPool(sub, nc, subj, cb, closure) natsConn_subscribeImpl((sub), (nc), true, (subj), NULL, 0, (cb), (closure), true)
#define natsConn_subscribeNoPoolNoLock(sub, nc, subj, cb, closure) natsConn_subscribeImpl((sub), (nc), false, (subj), NULL, 0, (cb), (closure), true)
#define natsConn_subscribeSyncNoPool(sub, nc, subj) natsConn_subscribeNoPool((sub), (nc), (subj), NULL, NULL)
#define natsConn_subscribeWithTimeout(sub, nc, subj, timeout, cb, closure) natsConn_subscribeImpl((sub), (nc), (subj), NULL, (timeout), (cb), (closure), false)
#define natsConn_subscribeWithTimeout(sub, nc, subj, timeout, cb, closure) natsConn_subscribeImpl((sub), (nc), true, (subj), NULL, (timeout), (cb), (closure), false)
#define natsConn_subscribe(sub, nc, subj, cb, closure) natsConn_subscribeWithTimeout((sub), (nc), (subj), 0, (cb), (closure))
#define natsConn_subscribeSync(sub, nc, subj) natsConn_subscribe((sub), (nc), (subj), NULL, NULL)
#define natsConn_queueSubscribeWithTimeout(sub, nc, subj, queue, timeout, cb, closure) natsConn_subscribeImpl((sub), (nc), (subj), (queue), (timeout), (cb), (closure), false)
#define natsConn_queueSubscribeWithTimeout(sub, nc, subj, queue, timeout, cb, closure) natsConn_subscribeImpl((sub), (nc), true, (subj), (queue), (timeout), (cb), (closure), false)
#define natsConn_queueSubscribe(sub, nc, subj, queue, cb, closure) natsConn_queueSubscribeWithTimeout((sub), (nc), (subj), (queue), 0, (cb), (closure))
#define natsConn_queueSubscribeSync(sub, nc, subj, queue) natsConn_queueSubscribe((sub), (nc), (subj), (queue), NULL, NULL)

natsStatus
natsConn_subscribeImpl(natsSubscription **newSub,
natsConnection *nc, const char *subj, const char *queue,
natsConnection *nc, bool lock, const char *subj, const char *queue,
int64_t timeout, natsMsgHandler cb, void *cbClosure,
bool preventUseOfLibDlvPool);

Expand Down Expand Up @@ -113,13 +114,7 @@ void
natsConn_disposeRespInfo(natsConnection *nc, respInfo *resp, bool needsLock);

natsStatus
natsConn_createRespMux(natsConnection *nc, char *ginbox, natsMsgHandler cb);

natsStatus
natsConn_waitForRespMux(natsConnection *nc);

natsStatus
natsConn_initResp(natsConnection *nc, char *ginbox, int ginboxSize);
natsConn_initResp(natsConnection *nc, natsMsgHandler cb);

void
natsConn_destroyRespPool(natsConnection *nc);
Expand Down
1 change: 0 additions & 1 deletion src/natsp.h
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,6 @@ struct __natsConnection
int respIdVal;
char *respSub; // The wildcard subject
natsSubscription *respMux; // A single response subscription
natsCondition *respReady; // For race when initializing the wildcard subscription
natsStrHash *respMap; // Request map for the response msg
respInfo **respPool;
int respPoolSize;
Expand Down
24 changes: 3 additions & 21 deletions src/pub.c
Original file line number Diff line number Diff line change
Expand Up @@ -388,10 +388,7 @@ natsConnection_RequestMsg(natsMsg **replyMsg, natsConnection *nc,
{
natsStatus s = NATS_OK;
respInfo *resp = NULL;
bool createSub = false;
bool needsRemoval= true;
bool waitForSub = false;
char ginbox[NATS_INBOX_PRE_LEN + NUID_BUFFER_LEN + 1 + 1 + 1]; // _INBOX.<nuid>.*
char respInbox[NATS_INBOX_PRE_LEN + NUID_BUFFER_LEN + 1 + NATS_MAX_REQ_ID_LEN + 1]; // _INBOX.<nuid>.<reqId>

if ((replyMsg == NULL) || (nc == NULL) || (m == NULL))
Expand All @@ -414,29 +411,14 @@ natsConnection_RequestMsg(natsMsg **replyMsg, natsConnection *nc,
// the connection object.
natsConn_retain(nc);

// Setup only once
if (nc->respReady == NULL)
{
s = natsConn_initResp(nc, ginbox, sizeof(ginbox));
createSub = (s == NATS_OK);
}
// Setup only once (but could be more if natsConn_initResp() returns != OK)
if (nc->respMux == NULL)
s = natsConn_initResp(nc, _respHandler);
if (s == NATS_OK)
s = natsConn_addRespInfo(&resp, nc, respInbox, sizeof(respInbox));

// If multiple requests are performed in parallel, only
// one will create the wildcard subscriptions, but the
// others need to wait for the subscription to be setup
// before publishing the message.
if (s == NATS_OK)
waitForSub = (nc->respMux == NULL);

natsConn_Unlock(nc);

if ((s == NATS_OK) && createSub)
s = natsConn_createRespMux(nc, ginbox, _respHandler);
else if ((s == NATS_OK) && waitForSub)
s = natsConn_waitForRespMux(nc);

if (s == NATS_OK)
{
m->reply = (const char*) respInbox;
Expand Down

0 comments on commit b151568

Please sign in to comment.