Skip to content

Commit

Permalink
Fix possible NullPointerException when retry in send Async way
Browse files Browse the repository at this point in the history
  • Loading branch information
Jaskey authored and dongeforever committed Jun 6, 2017
1 parent 47fad3c commit 02acf1a
Showing 1 changed file with 14 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -438,34 +438,40 @@ private void onExceptionImpl(final String brokerName, //
) {
int tmp = curTimes.incrementAndGet();
if (needRetry && tmp <= timesTotal) {
MessageQueue tmpmq = producer.selectOneMessageQueue(topicPublishInfo, brokerName);
String addr = instance.findBrokerAddressInPublish(tmpmq.getBrokerName());
String retryBrokerName = brokerName;//by default, it will send to the same broker
if (topicPublishInfo != null) { //select one message queue accordingly, in order to determine which broker to send
MessageQueue mqChosen = producer.selectOneMessageQueue(topicPublishInfo, brokerName);
retryBrokerName = mqChosen.getBrokerName();
}
String addr = instance.findBrokerAddressInPublish(retryBrokerName);
log.info("async send msg by retry {} times. topic={}, brokerAddr={}, brokerName={}", tmp, msg.getTopic(), addr,
tmpmq.getBrokerName());
retryBrokerName);
try {
request.setOpaque(RemotingCommand.createNewRequestId());
sendMessageAsync(addr, tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance,
sendMessageAsync(addr, retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance,
timesTotal, curTimes, context, producer);
} catch (InterruptedException e1) {
onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
context, false, producer);
} catch (RemotingConnectException e1) {
producer.updateFaultItem(brokerName, 3000, true);
onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
context, true, producer);
} catch (RemotingTooMuchRequestException e1) {
onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
context, false, producer);
} catch (RemotingException e1) {
producer.updateFaultItem(brokerName, 3000, true);
onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
context, true, producer);
}
} else {

if (context != null) {
context.setException(e);
context.getProducer().executeSendMessageHookAfter(context);
}

try {
sendCallback.onException(e);
} catch (Exception ignored) {
Expand Down

0 comments on commit 02acf1a

Please sign in to comment.