Skip to content

Commit

Permalink
Another intermediate commit - Added more rollback options
Browse files Browse the repository at this point in the history
  • Loading branch information
rsumbaly committed Apr 26, 2011
1 parent d06fcec commit 1a8ec6c
Show file tree
Hide file tree
Showing 10 changed files with 419 additions and 264 deletions.
27 changes: 17 additions & 10 deletions clients/python/voldemort/protocol/voldemort_admin_pb2.py

Large diffs are not rendered by default.

60 changes: 36 additions & 24 deletions src/java/voldemort/client/protocol/admin/AdminClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import voldemort.routing.RoutingStrategy;
import voldemort.routing.RoutingStrategyFactory;
import voldemort.server.protocol.admin.AsyncOperationStatus;
import voldemort.server.rebalance.VoldemortRebalancingException;
import voldemort.store.ErrorCodeMapper;
import voldemort.store.StoreDefinition;
import voldemort.store.metadata.MetadataStore;
Expand Down Expand Up @@ -87,6 +88,7 @@
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;

Expand Down Expand Up @@ -1696,6 +1698,7 @@ public void fetchPartitionFiles(int nodeId,
* Used in rebalancing to indicate change in states. Groups the partition
* plans on the basis of stealer nodes and sends them over
*
* @param existingCluster Current cluster
* @param transitionCluster Transition cluster
* @param rebalancePartitionPlanList The list of rebalance partition info
* plans
Expand All @@ -1706,29 +1709,26 @@ public void fetchPartitionFiles(int nodeId,
* @param changeRebalanceState Boolean indicating if we need to change
* rebalancing state
*/
public void rebalanceStateChange(Cluster transitionCluster,
public void rebalanceStateChange(Cluster existingCluster,
Cluster transitionCluster,
List<RebalancePartitionsInfo> rebalancePartitionPlanList,
Set<Integer> completedNodeIds,
boolean swapRO,
boolean changeClusterMetadata,
boolean changeRebalanceState) {
HashMap<Integer, List<RebalancePartitionsInfo>> stealerNodeToPlan = groupPartitionsInfoByStealerNode(rebalancePartitionPlanList);
Set<Integer> completedNodeIds = Sets.newHashSet();

int nodeId = 0;
try {
while(nodeId < transitionCluster.getNumberOfNodes()) {
boolean isStealerNode = false;
if(stealerNodeToPlan.containsKey(nodeId)) {
isStealerNode = true;
}

sendRebalanceStateChange(nodeId,
transitionCluster,
stealerNodeToPlan.get(nodeId),
swapRO,
changeClusterMetadata,
changeRebalanceState,
isStealerNode);
individualStateChange(nodeId,
transitionCluster,
stealerNodeToPlan.get(nodeId),
swapRO,
changeClusterMetadata,
changeRebalanceState,
stealerNodeToPlan.containsKey(nodeId),
false);
completedNodeIds.add(nodeId);
nodeId++;
}
Expand All @@ -1737,20 +1737,31 @@ public void rebalanceStateChange(Cluster transitionCluster,
logger.error("Got exceptions from node " + nodeId + " while changing state");

// Rollback changes on completed nodes
for(int completedNodeId: completedNodeIds) {
individualStateChange(completedNodeId,
existingCluster,
stealerNodeToPlan.get(completedNodeId),
swapRO,
changeClusterMetadata,
changeRebalanceState,
stealerNodeToPlan.containsKey(completedNodeId),
true);
}

throw new VoldemortException("Got exceptions from node " + nodeId
+ " nodes while changing state");
throw new VoldemortRebalancingException("Got exceptions from node " + nodeId
+ " while changing state");
}

}

private void sendRebalanceStateChange(int nodeId,
Cluster transitionCluster,
List<RebalancePartitionsInfo> rebalancePartitionPlanList,
boolean swapRO,
boolean changeClusterMetadata,
boolean changeRebalanceState,
boolean isStealerNode) {
public void individualStateChange(int nodeId,
Cluster cluster,
List<RebalancePartitionsInfo> rebalancePartitionPlanList,
boolean swapRO,
boolean changeClusterMetadata,
boolean changeRebalanceState,
boolean isStealerNode,
boolean rollback) {

// If we do not want to change the metadata and are not one of the
// stealer nodes, nothing to do
Expand Down Expand Up @@ -1780,7 +1791,8 @@ private void sendRebalanceStateChange(int nodeId,
VAdminProto.RebalanceStateChangeRequest getRebalanceStateChangeRequest = getRebalanceStateChangeRequestBuilder.setSwapRo(swapRO)
.setChangeClusterMetadata(changeClusterMetadata)
.setChangeRebalanceState(changeRebalanceState)
.setClusterString(clusterMapper.writeCluster(transitionCluster))
.setClusterString(clusterMapper.writeCluster(cluster))
.setRollback(rollback)
.build();
VAdminProto.VoldemortAdminRequest adminRequest = VAdminProto.VoldemortAdminRequest.newBuilder()
.setRebalanceStateChange(getRebalanceStateChangeRequest)
Expand Down
153 changes: 97 additions & 56 deletions src/java/voldemort/client/protocol/pb/VAdminProto.java

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ public Cluster getTargetCluster() {
return targetCluster;
}

public Cluster getCurrentCluster() {
return currentCluster;
}

public List<RebalanceNodePlan> getOrderedRebalanceNodePlanList() {
return orderedRebalanceNodePlanList;
}
Expand Down Expand Up @@ -85,10 +89,6 @@ private RebalanceClusterPlan getRebalanceClusterPlan() {
return rebalanceClusterPlan;
}

private Cluster getCurrentCluster() {
return currentCluster;
}

/**
* Given a {@link RebalanceClusterPlan}, extracts the queue of
* {@link RebalanceNodePlan} and finally orders it
Expand Down
Loading

0 comments on commit 1a8ec6c

Please sign in to comment.