Skip to content

Commit

Permalink
Move msg delivery worker into AsyncSubscriptionImpl
Browse files Browse the repository at this point in the history
Since only async subscription can make use of a delivery worker
it made sense to have this in that class.
To reduce code duplication it was initially set into SubscriptionImpl.

Updated some unit tests to cover AsyncSubscriptionImpl getters/setters.
  • Loading branch information
kozlovic committed Oct 5, 2017
1 parent 3947cca commit 13d194d
Show file tree
Hide file tree
Showing 6 changed files with 294 additions and 183 deletions.
131 changes: 129 additions & 2 deletions src/main/java/io/nats/client/AsyncSubscriptionImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
*/
class AsyncSubscriptionImpl extends SubscriptionImpl implements AsyncSubscription {

private MessageHandler msgHandler;
private MessageHandler msgHandler;
private MsgDeliveryWorker dlvWorker;

AsyncSubscriptionImpl(ConnectionImpl nc, String subj, String queue,
MessageHandler cb) {
Expand All @@ -33,12 +34,138 @@ public void start() {

@Override
public void setMessageHandler(MessageHandler cb) {
this.dlvWorkerLock();
this.msgHandler = cb;
this.dlvWorkerUnlock();
}

@Override
public MessageHandler getMessageHandler() {
return msgHandler;
this.dlvWorkerLock();
try {
return msgHandler;
} finally {
this.dlvWorkerUnlock();
}
}

@Override
public long getDelivered() {
this.dlvWorkerLock();
try {
return super.getDelivered();
} finally {
this.dlvWorkerUnlock();
}
}

@Override
public int getPendingMsgs() {
this.dlvWorkerLock();
try {
return super.getPendingMsgs();
} finally {
this.dlvWorkerUnlock();
}
}

@Override
public int getPendingBytes() {
this.dlvWorkerLock();
try {
return super.getPendingBytes();
} finally {
this.dlvWorkerUnlock();
}
}

@Override
public int getPendingMsgsMax() {
this.dlvWorkerLock();
try {
return super.getPendingMsgsMax();
} finally {
this.dlvWorkerUnlock();
}
}

@Override
public long getPendingBytesMax() {
this.dlvWorkerLock();
try {
return super.getPendingBytesMax();
} finally {
this.dlvWorkerUnlock();
}
}

@Override
public void setPendingLimits(int msgs, int bytes) {
this.dlvWorkerLock();
try {
super.setPendingLimits(msgs, bytes);
} finally {
this.dlvWorkerUnlock();
}
}

@Override
public int getPendingMsgsLimit() {
this.dlvWorkerLock();
try {
return super.getPendingMsgsLimit();
} finally {
this.dlvWorkerUnlock();
}
}

@Override
public void clearMaxPending() {
this.dlvWorkerLock();
try {
super.clearMaxPending();
} finally {
this.dlvWorkerUnlock();
}
}

@Override
public int getDropped() {
this.dlvWorkerLock();
try {
return super.getDropped();
} finally {
this.dlvWorkerUnlock();
}
}

@Override
public void close() {
this.dlvWorkerLock();
try {
super.close();
} finally {
this.dlvWorkerUnlock();
}
}

private void dlvWorkerLock() {
if (this.dlvWorker != null) {
this.dlvWorker.lock();
}
}

private void dlvWorkerUnlock() {
if (this.dlvWorker != null) {
this.dlvWorker.unlock();
}
}

MsgDeliveryWorker getDeliveryWorker() {
return this.dlvWorker;
}

void setDeliveryWorker(MsgDeliveryWorker worker) {
this.dlvWorker = worker;
}
}
7 changes: 5 additions & 2 deletions src/main/java/io/nats/client/ConnectionImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -1458,7 +1458,10 @@ void processMsg(byte[] data, int offset, int length) {
// It's possible that we end up not using the message, but that's ok.
Message msg = new Message(parser.ps.ma, sub, data, offset, length);

final MsgDeliveryWorker mdw = sub.getDeliveryWorker();
MsgDeliveryWorker mdw = null;
if (sub instanceof AsyncSubscriptionImpl) {
mdw = ((AsyncSubscriptionImpl) sub).getDeliveryWorker();
}
if (mdw != null) {
mdw.lock();
} else {
Expand Down Expand Up @@ -1831,7 +1834,7 @@ SubscriptionImpl subscribe(String subject, String queue, MessageHandler cb,

sub = new AsyncSubscriptionImpl(this, subject, queue, cb, useDlvPool);
if (useDlvPool) {
msgDlvPool.assignDeliveryWorker(sub);
msgDlvPool.assignDeliveryWorker((AsyncSubscriptionImpl) sub);
} else {
// If we have an async callback, start up a sub specific Runnable to deliver the
// messages
Expand Down
9 changes: 7 additions & 2 deletions src/main/java/io/nats/client/MsgDeliveryPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ synchronized int getSize() {
return this.workers.size();
}

synchronized void assignDeliveryWorker(SubscriptionImpl sub) {
synchronized void assignDeliveryWorker(AsyncSubscriptionImpl sub) {
int idx = this.idx;
if (++this.idx >= this.workers.size()) {
this.idx = 0;
Expand Down Expand Up @@ -139,7 +139,12 @@ public void run() {
if ((max > 0) && (delivered >= max))
{
// If we have hit the max for delivered msgs, remove sub.
nc.removeSub(sub);
nc.mu.lock();
try {
nc.removeSub(sub);
} finally {
nc.mu.unlock();
}
}

this.mu.lock();
Expand Down
48 changes: 0 additions & 48 deletions src/main/java/io/nats/client/SubscriptionImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,6 @@ abstract class SubscriptionImpl implements Subscription {
BlockingQueue<Message> mch;
Condition pCond;

// Ideally, this should be in AsyncSubscriptionImpl, but because of
// locking and to reduce code duplication, leave this here.
private MsgDeliveryWorker dlvWorker;

// Pending stats, async subscriptions, high-speed etc.
int pMsgs;
int pBytes;
Expand Down Expand Up @@ -187,9 +183,7 @@ public int getDropped() {
if (conn == null) {
throw new IllegalStateException(ERR_BAD_SUBSCRIPTION);
}
this.dlvWorkerLock();
rv = dropped;
this.dlvWorkerUnlock();
} finally {
mu.unlock();
}
Expand All @@ -204,9 +198,7 @@ public int getPendingMsgsMax() {
if (conn == null) {
throw new IllegalStateException(ERR_BAD_SUBSCRIPTION);
}
this.dlvWorkerLock();
rv = this.pMsgsMax;
this.dlvWorkerUnlock();
} finally {
mu.unlock();
}
Expand All @@ -222,9 +214,7 @@ public long getPendingBytesMax() {
if (conn == null) {
throw new IllegalStateException(ERR_BAD_SUBSCRIPTION);
}
this.dlvWorkerLock();
rv = this.pBytesMax;
this.dlvWorkerUnlock();
} finally {
mu.unlock();
}
Expand All @@ -244,9 +234,7 @@ void setPendingMsgsLimit(int pendingMsgsLimit) {
if (pendingMsgsLimit == 0) {
throw new IllegalArgumentException("nats: pending message limit cannot be zero");
}
this.dlvWorkerLock();
pMsgsLimit = pendingMsgsLimit;
this.dlvWorkerUnlock();
} finally {
mu.unlock();
}
Expand All @@ -258,9 +246,7 @@ void setPendingBytesLimit(int pendingBytesLimit) {
if (pendingBytesLimit == 0) {
throw new IllegalArgumentException("nats: pending message limit cannot be zero");
}
this.dlvWorkerLock();
pBytesLimit = pendingBytesLimit;
this.dlvWorkerUnlock();
} finally {
mu.unlock();
}
Expand All @@ -272,9 +258,7 @@ void setPendingMsgsMax(int max) {
if (conn == null) {
throw new IllegalStateException(ERR_BAD_SUBSCRIPTION);
}
this.dlvWorkerLock();
pMsgsMax = (max <= 0) ? 0 : max;
this.dlvWorkerUnlock();
} finally {
mu.unlock();
}
Expand All @@ -286,9 +270,7 @@ void setPendingBytesMax(int max) {
if (conn == null) {
throw new IllegalStateException(ERR_BAD_SUBSCRIPTION);
}
this.dlvWorkerLock();
pBytesMax = (max <= 0) ? 0 : max;
this.dlvWorkerUnlock();
} finally {
mu.unlock();
}
Expand Down Expand Up @@ -316,9 +298,7 @@ public long getDelivered() {
if (conn == null) {
throw new IllegalStateException(ERR_BAD_SUBSCRIPTION);
}
this.dlvWorkerLock();
rv = delivered;
this.dlvWorkerUnlock();
} finally {
mu.unlock();
}
Expand All @@ -333,9 +313,7 @@ public int getPendingBytes() {
if (conn == null) {
throw new IllegalStateException(ERR_BAD_SUBSCRIPTION);
}
this.dlvWorkerLock();
rv = pBytes;
this.dlvWorkerUnlock();
} finally {
mu.unlock();
}
Expand All @@ -346,9 +324,7 @@ public int getPendingBytes() {
public int getPendingBytesLimit() {
int rv;
mu.lock();
this.dlvWorkerLock();
rv = pBytesLimit;
this.dlvWorkerUnlock();
mu.unlock();
return rv;
}
Expand All @@ -361,9 +337,7 @@ public int getPendingMsgs() {
if (conn == null) {
throw new IllegalStateException(ERR_BAD_SUBSCRIPTION);
}
this.dlvWorkerLock();
rv = pMsgs;
this.dlvWorkerUnlock();
} finally {
mu.unlock();
}
Expand All @@ -374,9 +348,7 @@ public int getPendingMsgs() {
public int getPendingMsgsLimit() {
int rv;
mu.lock();
this.dlvWorkerLock();
rv = pMsgsLimit;
this.dlvWorkerUnlock();
mu.unlock();
return rv;
}
Expand Down Expand Up @@ -419,24 +391,4 @@ void lock() {
void unlock() {
mu.unlock();
}

private void dlvWorkerLock() {
if (this.dlvWorker != null) {
this.dlvWorker.lock();
}
}

private void dlvWorkerUnlock() {
if (this.dlvWorker != null) {
this.dlvWorker.unlock();
}
}

MsgDeliveryWorker getDeliveryWorker() {
return this.dlvWorker;
}

void setDeliveryWorker(MsgDeliveryWorker worker) {
this.dlvWorker = worker;
}
}
3 changes: 2 additions & 1 deletion src/test/java/io/nats/client/MsgDeliveryPoolTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ public void testMsgDeliveryPool() throws Exception {
assertEquals(7, pool.getSize());

final AtomicInteger received = new AtomicInteger(0);
ConnectionImpl nc = mock(ConnectionImpl.class);
Options opts = Nats.defaultOptions();
ConnectionImpl nc = new ConnectionImpl(opts);
AsyncSubscriptionImpl sub1 = new AsyncSubscriptionImpl(nc, "foo", null, new MessageHandler() {
public void onMessage(Message msg) {
received.incrementAndGet();
Expand Down
Loading

0 comments on commit 13d194d

Please sign in to comment.