Skip to content

Commit

Permalink
[ROCKETMQ-160]SendHeartBeat may not be logged in the expected period c…
Browse files Browse the repository at this point in the history
…loses apache#86
  • Loading branch information
Jaskey authored and dongeforever committed Jun 6, 2017
1 parent 787d128 commit 1f4b893
Showing 1 changed file with 34 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public Thread newThread(Runnable r) {
private final RebalanceService rebalanceService;
private final DefaultMQProducer defaultMQProducer;
private final ConsumerStatsManager consumerStatsManager;
private final AtomicLong storeTimesTotal = new AtomicLong(0);
private final AtomicLong sendHeartbeatTimesTotal = new AtomicLong(0);
private ServiceState serviceState = ServiceState.CREATE_JUST;
private DatagramSocket datagramSocket;
private Random random = new Random();
Expand Down Expand Up @@ -517,38 +517,40 @@ private void sendHeartbeatToAllBroker() {
return;
}

long times = this.storeTimesTotal.getAndIncrement();
Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, HashMap<Long, String>> entry = it.next();
String brokerName = entry.getKey();
HashMap<Long, String> oneTable = entry.getValue();
if (oneTable != null) {
for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
Long id = entry1.getKey();
String addr = entry1.getValue();
if (addr != null) {
if (consumerEmpty) {
if (id != MixAll.MASTER_ID)
continue;
}

try {
int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);
if (!this.brokerVersionTable.containsKey(brokerName)) {
this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4));
}
this.brokerVersionTable.get(brokerName).put(addr, version);
if (times % 20 == 0) {
log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr);
log.info(heartbeatData.toString());
if (!this.brokerAddrTable.isEmpty()) {
long times = this.sendHeartbeatTimesTotal.getAndIncrement();
Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, HashMap<Long, String>> entry = it.next();
String brokerName = entry.getKey();
HashMap<Long, String> oneTable = entry.getValue();
if (oneTable != null) {
for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
Long id = entry1.getKey();
String addr = entry1.getValue();
if (addr != null) {
if (consumerEmpty) {
if (id != MixAll.MASTER_ID)
continue;
}
} catch (Exception e) {
if (this.isBrokerInNameServer(addr)) {
log.error("send heart beat to broker exception", e);
} else {
log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName,
id, addr);

try {
int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);
if (!this.brokerVersionTable.containsKey(brokerName)) {
this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4));
}
this.brokerVersionTable.get(brokerName).put(addr, version);
if (times % 20 == 0) {
log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr);
log.info(heartbeatData.toString());
}
} catch (Exception e) {
if (this.isBrokerInNameServer(addr)) {
log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr);
} else {
log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName,
id, addr);
}
}
}
}
Expand Down

0 comments on commit 1f4b893

Please sign in to comment.