Skip to content

Commit

Permalink
[ROCKETMQ-203]Support client to allocate message queue in machine roo…
Browse files Browse the repository at this point in the history
…m nearby priority (apache#109)
  • Loading branch information
Jaskey authored and vongosling committed Jul 14, 2018
1 parent 6ae619c commit 461e516
Show file tree
Hide file tree
Showing 2 changed files with 381 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.consumer.rebalance;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.message.MessageQueue;
import org.slf4j.Logger;

/**
* An allocate strategy proxy for based on machine room nearside priority. An actual allocate strategy can be
* specified.
*
* If any consumer is alive in a machine room, the message queue of the broker which is deployed in the same machine
* should only be allocated to those. Otherwise, those message queues can be shared along all consumers since there are
* no alive consumer to monopolize them.
*/
public class AllocateMachineRoomNearby implements AllocateMessageQueueStrategy {
private final Logger log = ClientLogger.getLog();

private final AllocateMessageQueueStrategy allocateMessageQueueStrategy;//actual allocate strategy
private final MachineRoomResolver machineRoomResolver;

public AllocateMachineRoomNearby(AllocateMessageQueueStrategy allocateMessageQueueStrategy,
MachineRoomResolver machineRoomResolver) throws NullPointerException {
if (allocateMessageQueueStrategy == null) {
throw new NullPointerException("allocateMessageQueueStrategy is null");
}

if (machineRoomResolver == null) {
throw new NullPointerException("machineRoomResolver is null");
}

this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
this.machineRoomResolver = machineRoomResolver;
}

@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
if (currentCID == null || currentCID.length() < 1) {
throw new IllegalArgumentException("currentCID is empty");
}
if (mqAll == null || mqAll.isEmpty()) {
throw new IllegalArgumentException("mqAll is null or mqAll empty");
}
if (cidAll == null || cidAll.isEmpty()) {
throw new IllegalArgumentException("cidAll is null or cidAll empty");
}

List<MessageQueue> result = new ArrayList<MessageQueue>();
if (!cidAll.contains(currentCID)) {
log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
consumerGroup,
currentCID,
cidAll);
return result;
}

//group mq by machine room
Map<String/*machine room */, List<MessageQueue>> mr2Mq = new TreeMap<String, List<MessageQueue>>();
for (MessageQueue mq : mqAll) {
String brokerMachineRoom = machineRoomResolver.brokerDeployIn(mq);
if (StringUtils.isNoneEmpty(brokerMachineRoom)) {
if (mr2Mq.get(brokerMachineRoom) == null) {
mr2Mq.put(brokerMachineRoom, new ArrayList<MessageQueue>());
}
mr2Mq.get(brokerMachineRoom).add(mq);
} else {
throw new IllegalArgumentException("Machine room is null for mq " + mq);
}
}

//group consumer by machine room
Map<String/*machine room */, List<String/*clientId*/>> mr2c = new TreeMap<String, List<String>>();
for (String cid : cidAll) {
String consumerMachineRoom = machineRoomResolver.consumerDeployIn(cid);
if (StringUtils.isNoneEmpty(consumerMachineRoom)) {
if (mr2c.get(consumerMachineRoom) == null) {
mr2c.put(consumerMachineRoom, new ArrayList<String>());
}
mr2c.get(consumerMachineRoom).add(cid);
} else {
throw new IllegalArgumentException("Machine room is null for consumer id " + cid);
}
}

List<MessageQueue> allocateResults = new ArrayList<MessageQueue>();

//1.allocate the mq that deploy in the same machine room with the current consumer
String currentMachineRoom = machineRoomResolver.consumerDeployIn(currentCID);
List<MessageQueue> mqInThisMachineRoom = mr2Mq.remove(currentMachineRoom);
List<String> consumerInThisMachineRoom = mr2c.get(currentMachineRoom);
if (mqInThisMachineRoom != null && !mqInThisMachineRoom.isEmpty()) {
allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mqInThisMachineRoom, consumerInThisMachineRoom));
}

//2.allocate the rest mq to each machine room if there are no consumer alive in that machine room
for (String machineRoom : mr2Mq.keySet()) {
if (!mr2c.containsKey(machineRoom)) { // no alive consumer in the corresponding machine room, so all consumers share these queues
allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mr2Mq.get(machineRoom), cidAll));
}
}

return allocateResults;
}

@Override
public String getName() {
return "MACHINE_ROOM_NEARBY" + "-" + allocateMessageQueueStrategy.getName();
}

/**
* A resolver object to determine which machine room do the message queues or clients are deployed in.
*
* AllocateMachineRoomNearby will use the results to group the message queues and clients by machine room.
*
* The result returned from the implemented method CANNOT be null.
*/
public interface MachineRoomResolver {
String brokerDeployIn(MessageQueue messageQueue);

String consumerDeployIn(String clientID);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.consumer.rebalance;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.message.MessageQueue;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;


public class AllocateMachineRoomNearByTest {

private static final String CID_PREFIX = "CID-";

private final String topic = "topic_test";
private final AllocateMachineRoomNearby.MachineRoomResolver machineRoomResolver = new AllocateMachineRoomNearby.MachineRoomResolver() {
@Override public String brokerDeployIn(MessageQueue messageQueue) {
return messageQueue.getBrokerName().split("-")[0];
}

@Override public String consumerDeployIn(String clientID) {
return clientID.split("-")[0];
}
};
private final AllocateMessageQueueStrategy allocateMessageQueueStrategy = new AllocateMachineRoomNearby(new AllocateMessageQueueAveragely(), machineRoomResolver);


@Before
public void init() {
}


@Test
public void test1() {
testWhenIDCSizeEquals(5,20,10, false);
testWhenIDCSizeEquals(5,20,20, false);
testWhenIDCSizeEquals(5,20,30, false);
testWhenIDCSizeEquals(5,20,0, false );
}

@Test
public void test2() {
testWhenConsumerIDCIsMore(5,1,10, 10, false);
testWhenConsumerIDCIsMore(5,1,10, 5, false);
testWhenConsumerIDCIsMore(5,1,10, 20, false);
testWhenConsumerIDCIsMore(5,1,10, 0, false);
}

@Test
public void test3() {
testWhenConsumerIDCIsLess(5,2,10, 10, false);
testWhenConsumerIDCIsLess(5,2,10, 5, false);
testWhenConsumerIDCIsLess(5,2,10, 20, false);
testWhenConsumerIDCIsLess(5,2,10, 0, false);
}


@Test
public void testRun10RandomCase(){
for(int i=0;i<10;i++){
int consumerSize = new Random().nextInt(200)+1;//1-200
int queueSize = new Random().nextInt(100)+1;//1-100
int brokerIDCSize = new Random().nextInt(10)+1;//1-10
int consumerIDCSize = new Random().nextInt(10)+1;//1-10

if (brokerIDCSize == consumerIDCSize) {
testWhenIDCSizeEquals(brokerIDCSize,queueSize,consumerSize,false);
}
else if (brokerIDCSize > consumerIDCSize) {
testWhenConsumerIDCIsLess(brokerIDCSize,brokerIDCSize- consumerIDCSize, queueSize, consumerSize, false);
} else {
testWhenConsumerIDCIsMore(brokerIDCSize, consumerIDCSize - brokerIDCSize, queueSize, consumerSize, false);
}
}
}




public void testWhenIDCSizeEquals(int IDCSize, int queueSize, int consumerSize, boolean print) {
if (print) {
System.out.println("Test : IDCSize = "+ IDCSize +"queueSize = " + queueSize +" consumerSize = " + consumerSize);
}
List<String> cidAll = prepareConsumer(IDCSize, consumerSize);
List<MessageQueue> mqAll = prepareMQ(IDCSize, queueSize);
List<MessageQueue> resAll = new ArrayList<MessageQueue>();
for (String currentID : cidAll) {
List<MessageQueue> res = allocateMessageQueueStrategy.allocate("Test-C-G",currentID,mqAll,cidAll);
if (print) {
System.out.println("cid: "+currentID+"--> res :" +res);
}
for (MessageQueue mq : res) {
Assert.assertTrue(machineRoomResolver.brokerDeployIn(mq).equals(machineRoomResolver.consumerDeployIn(currentID)));
}
resAll.addAll(res);
}
Assert.assertTrue(hasAllocateAllQ(cidAll,mqAll,resAll));

if (print) {
System.out.println("-------------------------------------------------------------------");
}
}

public void testWhenConsumerIDCIsMore(int brokerIDCSize, int consumerMore, int queueSize, int consumerSize, boolean print) {
if (print) {
System.out.println("Test : IDCSize = "+ brokerIDCSize +" queueSize = " + queueSize +" consumerSize = " + consumerSize);
}
Set<String> brokerIDCWithConsumer = new TreeSet<String>();
List<String> cidAll = prepareConsumer(brokerIDCSize +consumerMore, consumerSize);
List<MessageQueue> mqAll = prepareMQ(brokerIDCSize, queueSize);
for (MessageQueue mq : mqAll) {
brokerIDCWithConsumer.add(machineRoomResolver.brokerDeployIn(mq));
}

List<MessageQueue> resAll = new ArrayList<MessageQueue>();
for (String currentID : cidAll) {
List<MessageQueue> res = allocateMessageQueueStrategy.allocate("Test-C-G",currentID,mqAll,cidAll);
if (print) {
System.out.println("cid: "+currentID+"--> res :" +res);
}
for (MessageQueue mq : res) {
if (brokerIDCWithConsumer.contains(machineRoomResolver.brokerDeployIn(mq))) {//healthy idc, so only consumer in this idc should be allocated
Assert.assertTrue(machineRoomResolver.brokerDeployIn(mq).equals(machineRoomResolver.consumerDeployIn(currentID)));
}
}
resAll.addAll(res);
}

Assert.assertTrue(hasAllocateAllQ(cidAll,mqAll,resAll));
if (print) {
System.out.println("-------------------------------------------------------------------");
}
}

public void testWhenConsumerIDCIsLess(int brokerIDCSize, int consumerIDCLess, int queueSize, int consumerSize, boolean print) {
if (print) {
System.out.println("Test : IDCSize = "+ brokerIDCSize +" queueSize = " + queueSize +" consumerSize = " + consumerSize);
}
Set<String> healthyIDC = new TreeSet<String>();
List<String> cidAll = prepareConsumer(brokerIDCSize - consumerIDCLess, consumerSize);
List<MessageQueue> mqAll = prepareMQ(brokerIDCSize, queueSize);
for (String cid : cidAll) {
healthyIDC.add(machineRoomResolver.consumerDeployIn(cid));
}

List<MessageQueue> resAll = new ArrayList<MessageQueue>();
Map<String, List<MessageQueue>> idc2Res = new TreeMap<String, List<MessageQueue>>();
for (String currentID : cidAll) {
String currentIDC = machineRoomResolver.consumerDeployIn(currentID);
List<MessageQueue> res = allocateMessageQueueStrategy.allocate("Test-C-G",currentID,mqAll,cidAll);
if (print) {
System.out.println("cid: "+currentID+"--> res :" +res);
}
if ( !idc2Res.containsKey(currentIDC)) {
idc2Res.put(currentIDC, new ArrayList<MessageQueue>());
}
idc2Res.get(currentIDC).addAll(res);
resAll.addAll(res);
}

for (String consumerIDC : healthyIDC) {
List<MessageQueue> resInOneIDC = idc2Res.get(consumerIDC);
List<MessageQueue> mqInThisIDC = createMessageQueueList(consumerIDC,queueSize);
Assert.assertTrue(resInOneIDC.containsAll(mqInThisIDC));
}

Assert.assertTrue(hasAllocateAllQ(cidAll,mqAll,resAll));
if (print) {
System.out.println("-------------------------------------------------------------------");
}
}


private boolean hasAllocateAllQ(List<String> cidAll,List<MessageQueue> mqAll, List<MessageQueue> allocatedResAll) {
if (cidAll.isEmpty()){
return allocatedResAll.isEmpty();
}
return mqAll.containsAll(allocatedResAll) && allocatedResAll.containsAll(mqAll) && mqAll.size() == allocatedResAll.size();
}


private List<String> createConsumerIdList(String machineRoom, int size) {
List<String> consumerIdList = new ArrayList<String>(size);
for (int i = 0; i < size; i++) {
consumerIdList.add(machineRoom +"-"+CID_PREFIX + String.valueOf(i));
}
return consumerIdList;
}

private List<MessageQueue> createMessageQueueList(String machineRoom, int size) {
List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>(size);
for (int i = 0; i < size; i++) {
MessageQueue mq = new MessageQueue(topic, machineRoom+"-brokerName", i);
messageQueueList.add(mq);
}
return messageQueueList;
}

private List<MessageQueue> prepareMQ(int brokerIDCSize, int queueSize) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
for (int i=1;i<=brokerIDCSize;i++) {
mqAll.addAll(createMessageQueueList("IDC"+i, queueSize));
}

return mqAll;
}

private List<String> prepareConsumer( int IDCSize, int consumerSize) {
List<String> cidAll = new ArrayList<String>();
for (int i=1;i<=IDCSize;i++) {
cidAll.addAll(createConsumerIdList("IDC"+i, consumerSize));
}
return cidAll;
}
}

0 comments on commit 461e516

Please sign in to comment.