forked from chenld/RocketMQ
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
50dcec8
commit 425dcba
Showing
12 changed files
with
148 additions
and
202 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,22 +15,6 @@ | |
*/ | ||
package com.alibaba.rocketmq.namesrv.routeinfo; | ||
|
||
import io.netty.channel.Channel; | ||
|
||
import java.util.HashMap; | ||
import java.util.HashSet; | ||
import java.util.Iterator; | ||
import java.util.LinkedList; | ||
import java.util.List; | ||
import java.util.Map.Entry; | ||
import java.util.Set; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.locks.ReadWriteLock; | ||
import java.util.concurrent.locks.ReentrantReadWriteLock; | ||
|
||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import com.alibaba.rocketmq.common.DataVersion; | ||
import com.alibaba.rocketmq.common.MixAll; | ||
import com.alibaba.rocketmq.common.TopicConfig; | ||
|
@@ -41,11 +25,20 @@ | |
import com.alibaba.rocketmq.common.protocol.route.BrokerData; | ||
import com.alibaba.rocketmq.common.protocol.route.QueueData; | ||
import com.alibaba.rocketmq.common.protocol.route.TopicRouteData; | ||
import io.netty.channel.Channel; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.util.*; | ||
import java.util.Map.Entry; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.locks.ReadWriteLock; | ||
import java.util.concurrent.locks.ReentrantReadWriteLock; | ||
|
||
|
||
/** | ||
* 运行过程中的路由信息,数据只在内存,宕机后数据消失,但是Broker会定期推送最新数据 | ||
* | ||
* | ||
* @author shijia.wxr<[email protected]> | ||
* @since 2013-7-2 | ||
*/ | ||
|
@@ -100,8 +93,7 @@ private void createAndUpdateQueueData(final String brokerName, final TopicConfig | |
queueDataList.add(queueData); | ||
this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList); | ||
log.info("new topic registerd, {} {}", topicConfig.getTopicName(), queueData); | ||
} | ||
else { | ||
} else { | ||
boolean addNewOne = true; | ||
|
||
Iterator<QueueData> it = queueDataList.iterator(); | ||
|
@@ -110,10 +102,9 @@ private void createAndUpdateQueueData(final String brokerName, final TopicConfig | |
if (qd.getBrokerName().equals(brokerName)) { | ||
if (qd.equals(queueData)) { | ||
addNewOne = false; | ||
} | ||
else { | ||
} else { | ||
log.info("topic changed, {} OLD: {} NEW: {}", topicConfig.getTopicName(), qd, | ||
queueData); | ||
queueData); | ||
it.remove(); | ||
} | ||
} | ||
|
@@ -130,13 +121,13 @@ private void createAndUpdateQueueData(final String brokerName, final TopicConfig | |
* @return 如果是slave,则返回master的ha地址 | ||
*/ | ||
public RegisterBrokerResult registerBroker(// | ||
final String clusterName,// 1 | ||
final String brokerAddr,// 2 | ||
final String brokerName,// 3 | ||
final long brokerId,// 4 | ||
final String haServerAddr,// 5 | ||
final TopicConfigSerializeWrapper topicConfigWrapper,// 6 | ||
final Channel channel// 7 | ||
final String clusterName,// 1 | ||
final String brokerAddr,// 2 | ||
final String brokerName,// 3 | ||
final long brokerId,// 4 | ||
final String haServerAddr,// 5 | ||
final TopicConfigSerializeWrapper topicConfigWrapper,// 6 | ||
final Channel channel// 7 | ||
) { | ||
RegisterBrokerResult result = new RegisterBrokerResult(); | ||
try { | ||
|
@@ -167,7 +158,7 @@ public RegisterBrokerResult registerBroker(// | |
if (null != topicConfigWrapper // | ||
&& MixAll.MASTER_ID == brokerId) { | ||
if (this.isBrokerTopicConfigChanged(brokerAddr,// | ||
topicConfigWrapper.getDataVersion())) { | ||
topicConfigWrapper.getDataVersion())) { | ||
ConcurrentHashMap<String, TopicConfig> tcTable = | ||
topicConfigWrapper.getTopicConfigTable(); | ||
if (tcTable != null) { | ||
|
@@ -181,11 +172,11 @@ public RegisterBrokerResult registerBroker(// | |
|
||
// 更新最后变更时间 | ||
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr, // | ||
new BrokerLiveInfo(// | ||
System.currentTimeMillis(), // | ||
topicConfigWrapper.getDataVersion(),// | ||
channel, // | ||
haServerAddr)); | ||
new BrokerLiveInfo(// | ||
System.currentTimeMillis(), // | ||
topicConfigWrapper.getDataVersion(),// | ||
channel, // | ||
haServerAddr)); | ||
if (null == prevBrokerLiveInfo) { | ||
log.info("new broker registerd, {} HAServer: {}", brokerAddr, haServerAddr); | ||
} | ||
|
@@ -201,12 +192,10 @@ public RegisterBrokerResult registerBroker(// | |
} | ||
} | ||
} | ||
} | ||
finally { | ||
} finally { | ||
this.lock.writeLock().unlock(); | ||
} | ||
} | ||
catch (Exception e) { | ||
} catch (Exception e) { | ||
log.error("registerBroker Exception", e); | ||
} | ||
|
||
|
@@ -215,19 +204,19 @@ public RegisterBrokerResult registerBroker(// | |
|
||
|
||
public void unregisterBroker(// | ||
final String clusterName,// 1 | ||
final String brokerAddr,// 2 | ||
final String brokerName,// 3 | ||
final long brokerId// 4 | ||
final String clusterName,// 1 | ||
final String brokerAddr,// 2 | ||
final String brokerName,// 3 | ||
final long brokerId// 4 | ||
) { | ||
try { | ||
try { | ||
this.lock.writeLock().lockInterruptibly(); | ||
BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.remove(brokerAddr); | ||
if (brokerLiveInfo != null) { | ||
log.info("unregisterBroker, remove from brokerLiveTable {}, {}", // | ||
(brokerLiveInfo != null ? "OK" : "Failed"),// | ||
brokerAddr// | ||
(brokerLiveInfo != null ? "OK" : "Failed"),// | ||
brokerAddr// | ||
); | ||
} | ||
|
||
|
@@ -236,14 +225,14 @@ public void unregisterBroker(// | |
if (null != brokerData) { | ||
String addr = brokerData.getBrokerAddrs().remove(brokerId); | ||
log.info("unregisterBroker, remove addr from brokerAddrTable {}, {}", // | ||
(addr != null ? "OK" : "Failed"),// | ||
brokerAddr// | ||
(addr != null ? "OK" : "Failed"),// | ||
brokerAddr// | ||
); | ||
|
||
if (brokerData.getBrokerAddrs().isEmpty()) { | ||
this.brokerAddrTable.remove(brokerName); | ||
log.info("unregisterBroker, remove name from brokerAddrTable OK, {}", // | ||
brokerName// | ||
brokerName// | ||
); | ||
|
||
removeBrokerName = true; | ||
|
@@ -255,27 +244,25 @@ public void unregisterBroker(// | |
if (nameSet != null) { | ||
boolean removed = nameSet.remove(brokerName); | ||
log.info("unregisterBroker, remove name from clusterAddrTable {}, {}", // | ||
(removed ? "OK" : "Failed"),// | ||
brokerName// | ||
(removed ? "OK" : "Failed"),// | ||
brokerName// | ||
); | ||
|
||
if (nameSet.isEmpty()) { | ||
this.clusterAddrTable.remove(clusterName); | ||
log.info("unregisterBroker, remove cluster from clusterAddrTable {}", // | ||
clusterName// | ||
clusterName// | ||
); | ||
} | ||
} | ||
|
||
// 删除相应的topic | ||
this.removeTopicByBrokerName(brokerName); | ||
} | ||
} | ||
finally { | ||
} finally { | ||
this.lock.writeLock().unlock(); | ||
} | ||
} | ||
catch (Exception e) { | ||
} catch (Exception e) { | ||
log.error("unregisterBroker Exception", e); | ||
} | ||
} | ||
|
@@ -334,18 +321,16 @@ public TopicRouteData pickupTopicRouteData(final String topic) { | |
BrokerData brokerDataClone = new BrokerData(); | ||
brokerDataClone.setBrokerName(brokerData.getBrokerName()); | ||
brokerDataClone.setBrokerAddrs((HashMap<Long, String>) brokerData | ||
.getBrokerAddrs().clone()); | ||
.getBrokerAddrs().clone()); | ||
brokerDataList.add(brokerDataClone); | ||
foundBrokerData = true; | ||
} | ||
} | ||
} | ||
} | ||
finally { | ||
} finally { | ||
this.lock.readLock().unlock(); | ||
} | ||
} | ||
catch (Exception e) { | ||
} catch (Exception e) { | ||
log.error("pickupTopicRouteData Exception", e); | ||
} | ||
|
||
|
@@ -384,12 +369,10 @@ public void onChannelDestroy(String remoteAddr, Channel channel) { | |
break; | ||
} | ||
} | ||
} | ||
finally { | ||
} finally { | ||
this.lock.readLock().unlock(); | ||
} | ||
} | ||
catch (Exception e) { | ||
} catch (Exception e) { | ||
log.error("onChannelDestroy Exception", e); | ||
} | ||
|
||
|
@@ -421,8 +404,8 @@ public void onChannelDestroy(String remoteAddr, Channel channel) { | |
brokerNameFound = brokerData.getBrokerName(); | ||
it.remove(); | ||
log.info( | ||
"remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed", | ||
brokerId, brokerAddr); | ||
"remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed", | ||
brokerId, brokerAddr); | ||
break; | ||
} | ||
} | ||
|
@@ -432,7 +415,7 @@ public void onChannelDestroy(String remoteAddr, Channel channel) { | |
brokerNameDisappear = true; | ||
itBrokerAddrTable.remove(); | ||
log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed", | ||
brokerData.getBrokerName()); | ||
brokerData.getBrokerName()); | ||
} | ||
} | ||
|
||
|
@@ -446,8 +429,8 @@ public void onChannelDestroy(String remoteAddr, Channel channel) { | |
boolean removed = brokerNames.remove(brokerNameFound); | ||
if (removed) { | ||
log.info( | ||
"remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed", | ||
brokerNameFound, clusterName); | ||
"remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed", | ||
brokerNameFound, clusterName); | ||
break; | ||
} | ||
} | ||
|
@@ -468,25 +451,23 @@ public void onChannelDestroy(String remoteAddr, Channel channel) { | |
if (queueData.getBrokerName().equals(brokerNameFound)) { | ||
itQueueData.remove(); | ||
log.info( | ||
"remove topic[{} {}], from topicQueueTable, because channel destroyed", | ||
topic, queueData); | ||
"remove topic[{} {}], from topicQueueTable, because channel destroyed", | ||
topic, queueData); | ||
} | ||
} | ||
|
||
if (queueDataList.isEmpty()) { | ||
itTopicQueueTable.remove(); | ||
log.info( | ||
"remove topic[{}] all queue, from topicQueueTable, because channel destroyed", | ||
topic); | ||
"remove topic[{}] all queue, from topicQueueTable, because channel destroyed", | ||
topic); | ||
} | ||
} | ||
} | ||
} | ||
finally { | ||
} finally { | ||
this.lock.writeLock().unlock(); | ||
} | ||
} | ||
catch (Exception e) { | ||
} catch (Exception e) { | ||
log.error("onChannelDestroy Exception", e); | ||
} | ||
} | ||
|
@@ -509,12 +490,10 @@ public void printAllPeriodically() { | |
log.info("brokerLiveTable {}", this.brokerLiveTable); | ||
|
||
log.info("clusterAddrTable {}", this.clusterAddrTable); | ||
} | ||
finally { | ||
} finally { | ||
this.lock.readLock().unlock(); | ||
} | ||
} | ||
catch (Exception e) { | ||
} catch (Exception e) { | ||
log.error("printAllPeriodically Exception", e); | ||
} | ||
} | ||
|
@@ -529,7 +508,7 @@ class BrokerLiveInfo { | |
|
||
|
||
public BrokerLiveInfo(long lastUpdateTimestamp, DataVersion dataVersion, Channel channel, | ||
String haServerAddr) { | ||
String haServerAddr) { | ||
this.lastUpdateTimestamp = lastUpdateTimestamp; | ||
this.dataVersion = dataVersion; | ||
this.channel = channel; | ||
|
Oops, something went wrong.