Skip to content

Commit

Permalink
fixed bug: assign router error
Browse files Browse the repository at this point in the history
  • Loading branch information
zh_yu committed Feb 19, 2019
1 parent 5d82f7e commit 041d1f5
Showing 1 changed file with 61 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import qunar.tc.qmq.common.ClientType;
import qunar.tc.qmq.configuration.DynamicConfig;
import qunar.tc.qmq.meta.BrokerGroup;
import qunar.tc.qmq.meta.BrokerState;
import qunar.tc.qmq.meta.cache.CachedMetaInfoManager;
import qunar.tc.qmq.meta.loadbalance.LoadBalance;
import qunar.tc.qmq.meta.loadbalance.RandomLoadBalance;
Expand Down Expand Up @@ -69,25 +70,30 @@ public List<BrokerGroup> route(final String subject, final MetaInfoRequest reque
}

private List<BrokerGroup> doRoute(String subject, int clientTypeCode) {
SubjectInfo subjectInfo = getOrCreateSubjectInfo(subject);

//query assigned brokers
final List<String> assignedBrokers = cachedMetaInfoManager.getGroups(subject);

List<String> newAssignedBrokers;
if (assignedBrokers == null || assignedBrokers.size() == 0) {
newAssignedBrokers = assignNewBrokers(subjectInfo, clientTypeCode);
} else {
newAssignedBrokers = reAssignBrokers(subjectInfo, assignedBrokers, clientTypeCode);
}

return selectExistedBrokerGroups(newAssignedBrokers);
}

private SubjectInfo getOrCreateSubjectInfo(String subject) {
SubjectInfo subjectInfo = cachedMetaInfoManager.getSubjectInfo(subject);
if (subjectInfo == null) {
// just add monitor event, will use broker_groups with default tag
QMon.subjectInfoNotFound(subject);
subjectInfo = new SubjectInfo();
subjectInfo.setName(subject);
}

//query assigned brokers
final List<String> cachedGroupNames = cachedMetaInfoManager.getGroups(subject);

List<String> routeBrokerGroupNames;
if (cachedGroupNames == null || cachedGroupNames.size() == 0) {
routeBrokerGroupNames = assignNewBrokers(subjectInfo, clientTypeCode);
} else {
routeBrokerGroupNames = reQueryAssignedBrokers(subjectInfo, cachedGroupNames, clientTypeCode);
}

return selectExistedBrokerGroups(routeBrokerGroupNames);
return subjectInfo;
}

private List<String> assignNewBrokers(SubjectInfo subjectInfo, int clientTypeCode) {
Expand All @@ -96,7 +102,7 @@ private List<String> assignNewBrokers(SubjectInfo subjectInfo, int clientTypeCod
}

String subject = subjectInfo.getName();
final List<String> brokerGroupNames = findBrokerGroupNamesFromCache(subjectInfo.getTag());
final List<String> brokerGroupNames = findAvailableBrokerGroupNames(subjectInfo.getTag());
final List<String> loadBalanceSelect = loadBalance.select(subject, brokerGroupNames, minGroupNum);
final int affected = store.insertSubjectRoute(subject, MIN_SUBJECT_ROUTE_VERSION, loadBalanceSelect);
if (affected == 1) {
Expand All @@ -106,13 +112,13 @@ private List<String> assignNewBrokers(SubjectInfo subjectInfo, int clientTypeCod
return findOrUpdateInStore(subjectInfo);
}

private List<String> reQueryAssignedBrokers(SubjectInfo subjectInfo, List<String> cachedGroupNames, int clientTypeCode) {
private List<String> reAssignBrokers(SubjectInfo subjectInfo, List<String> assignedBrokers, int clientTypeCode) {
if (clientTypeCode == ClientType.CONSUMER.getCode()) {
return cachedGroupNames;
return assignedBrokers;
}

if (cachedGroupNames.size() >= minGroupNum) {
return cachedGroupNames;
if (assignedBrokers.size() >= minGroupNum) {
return assignedBrokers;
}

return findOrUpdateInStore(subjectInfo);
Expand All @@ -124,39 +130,48 @@ private List<String> findOrUpdateInStore(final SubjectInfo subjectInfo) {
int tries = 0;

while (tries++ < MAX_UPDATE_RETRY_TIMES) {
final SubjectRoute subjectRoute = findSubjectRouteInStore(subject);
final List<String> oldBrokerGroupNames = subjectRoute.getBrokerGroups();
if (oldBrokerGroupNames.size() >= minGroupNum) {
return oldBrokerGroupNames;
}
final SubjectRoute subjectRoute = loadSubjectRoute(subject);
List<String> assignedBrokers = subjectRoute.getBrokerGroups();
if (assignedBrokers == null) assignedBrokers = new ArrayList<>();

final List<String> brokerGroupNames = findBrokerGroupNamesFromCache(subjectInfo.getTag());
if (brokerGroupNames.size() < minGroupNum) {
return oldBrokerGroupNames;
}
if (assignedBrokers.size() >= minGroupNum) return assignedBrokers;

final List<String> brokerGroupNames = findAvailableBrokerGroupNames(subjectInfo.getTag());
final List<String> idleBrokers = removeAssignedBrokers(brokerGroupNames, assignedBrokers);
if (idleBrokers.isEmpty()) return assignedBrokers;

final List<String> select = loadBalance.select(subject, brokerGroupNames, minGroupNum);
final List<String> merge = merge(oldBrokerGroupNames, select);
final List<String> newAssigned = loadBalance.select(subject, idleBrokers, minGroupNum - assignedBrokers.size());
final List<String> merge = merge(assignedBrokers, newAssigned);
final int affected = store.updateSubjectRoute(subject, subjectRoute.getVersion(), merge);
if (affected == 1) {
return select;
return merge;
}
}
throw new RuntimeException("find same room subject route error");
}

private List<String> removeAssignedBrokers(List<String> brokerGroupNames, List<String> assignedBrokers) {
List<String> result = new ArrayList<>();
for (String name : brokerGroupNames) {
if (assignedBrokers.contains(name)) continue;

result.add(name);
}
return result;
}

private List<String> merge(List<String> oldBrokerGroupNames, List<String> select) {
final Set<String> merge = new HashSet<>();
merge.addAll(oldBrokerGroupNames);
merge.addAll(select);
return new ArrayList<>(merge);
}

private SubjectRoute findSubjectRouteInStore(String subject) {
private SubjectRoute loadSubjectRoute(String subject) {
return store.selectSubjectRoute(subject);
}

private List<String> findBrokerGroupNamesFromCache(String tag) {
private List<String> findAvailableBrokerGroupNames(String tag) {
List<String> brokerGroupNames = cachedMetaInfoManager.getAllBrokerGroupNamesByTag(tag);
if (brokerGroupNames == null || brokerGroupNames.isEmpty()) {
brokerGroupNames = cachedMetaInfoManager.getAllDefaultTagBrokerGroupNames();
Expand All @@ -165,20 +180,27 @@ private List<String> findBrokerGroupNamesFromCache(String tag) {
if (brokerGroupNames == null || brokerGroupNames.isEmpty()) {
throw new RuntimeException("no broker groups");
}
return brokerGroupNames;

List<String> result = new ArrayList<>();
for (String name : brokerGroupNames) {
BrokerGroup brokerGroup = cachedMetaInfoManager.getBrokerGroup(name);
if (brokerGroup == null || brokerGroup.getBrokerState() == BrokerState.NRW) continue;
result.add(name);
}
return result;
}

private List<BrokerGroup> selectExistedBrokerGroups(final List<String> cachedGroupNames) {
if (cachedGroupNames == null || cachedGroupNames.isEmpty()) {
private List<BrokerGroup> selectExistedBrokerGroups(final List<String> brokerGroupNames) {
if (brokerGroupNames == null || brokerGroupNames.isEmpty()) {
return Collections.emptyList();
}
final List<BrokerGroup> cachedBrokerGroups = new ArrayList<>();
for (String groupName : cachedGroupNames) {
final BrokerGroup brokerGroup = cachedMetaInfoManager.getBrokerGroup(groupName);
final List<BrokerGroup> result = new ArrayList<>();
for (String name : brokerGroupNames) {
final BrokerGroup brokerGroup = cachedMetaInfoManager.getBrokerGroup(name);
if (brokerGroup != null) {
cachedBrokerGroups.add(brokerGroup);
result.add(brokerGroup);
}
}
return cachedBrokerGroups;
return result;
}
}

0 comments on commit 041d1f5

Please sign in to comment.