Skip to content

Commit

Permalink
send no handlers failure if attempt to reply to a message that doesn'…
Browse files Browse the repository at this point in the history
…t have a reply address
  • Loading branch information
purplefox committed Nov 4, 2013
1 parent b98c518 commit 11b9f75
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ private <T> void sendReply(BaseMessage msg, Handler<Message<T>> replyHandler) {
}

private <T> void sendReplyWithTimeout(BaseMessage msg, long timeout, Handler<AsyncResult<Message<T>>> replyHandler) {
if (bus != null && replyAddress != null) {
if (bus != null) {
bus.sendReplyWithTimeout(sender, msg, timeout, replyHandler);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -524,8 +524,12 @@ <T, U> void sendReply(ServerID dest, BaseMessage<U> message, Handler<Message<T>>
}

<T, U> void sendReplyWithTimeout(ServerID dest, BaseMessage<U> message, long timeout, Handler<AsyncResult<Message<T>>> replyHandler) {
Handler<Message<T>> handler = convertHandler(replyHandler);
sendOrPub(dest, message, handler, replyHandler, timeout);
if (message.address == null) {
sendNoHandlersFailure(replyHandler);
} else {
Handler<Message<T>> handler = convertHandler(replyHandler);
sendOrPub(dest, message, handler, replyHandler, timeout);
}
}

static <U> BaseMessage<U> createMessage(boolean send, String address, U message) {
Expand Down Expand Up @@ -883,16 +887,21 @@ private <T> void receiveMessage(final BaseMessage msg, final Handler<AsyncResult
} else {
// no handlers
if (asyncResultHandler != null) {
vertx.runOnContext(new Handler<Void>() {
@Override
public void handle(Void v) {
asyncResultHandler.handle(new DefaultFutureResult<Message<T>>(new ReplyException(ReplyFailure.NO_HANDLERS)));
}
});
sendNoHandlersFailure(asyncResultHandler);
}
}
}

private <T> void sendNoHandlersFailure(final Handler<AsyncResult<Message<T>>> handler) {
vertx.runOnContext(new Handler<Void>() {
@Override
public void handle(Void v) {
handler.handle(new DefaultFutureResult<Message<T>>(new ReplyException(ReplyFailure.NO_HANDLERS)));
}
});
}


private <T> void doReceive(final BaseMessage<T> msg, final HandlerHolder<T> holder) {
// Each handler gets a fresh copy
final Message<T> copied = msg.copy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,9 @@ public void handle(final Message<String> message) {
@Override
public void handle(AsyncResult<Message<String>> event) {
tu.azzert(event.failed(), "Should not get a reply after timeout");
tu.azzert(event.cause() instanceof ReplyException);
ReplyException re = (ReplyException)event.cause();
tu.azzert(ReplyFailure.NO_HANDLERS == re.failureType());
tu.testComplete();
}
});
Expand Down

0 comments on commit 11b9f75

Please sign in to comment.