Skip to content

Commit

Permalink
refactored ProtobuffAdminClient to AdminClient.
Browse files Browse the repository at this point in the history
  • Loading branch information
bbansal committed Dec 15, 2009
1 parent f6efc91 commit 108c448
Show file tree
Hide file tree
Showing 19 changed files with 832 additions and 671 deletions.
1 change: 1 addition & 0 deletions src/java/voldemort/client/DefaultStoreClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ public List<Node> getResponsibleNodes(K key) {
return strategy.routeRequest(keySerializer.toBytes(key));
}

@SuppressWarnings("unused")
private Version getVersion(K key) {
List<Version> versions = getVersions(key);
if(versions.size() == 0)
Expand Down
9 changes: 4 additions & 5 deletions src/java/voldemort/client/protocol/VoldemortFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package voldemort.client.protocol;

import voldemort.client.protocol.admin.AdminClient;
import voldemort.versioning.Versioned;

/**
Expand All @@ -26,16 +25,16 @@
* side filtering for
* <ul>
* <li>
* {@link AdminClient#deletePartitions(int, String, java.util.List, VoldemortFilter)}
* {@link AdminClientOLD#deletePartitions(int, String, java.util.List, VoldemortFilter)}
* </li>
* <li>
* {@link AdminClient#fetchPartitionEntries(int, String, java.util.List, VoldemortFilter)}
* {@link AdminClientOLD#fetchPartitionEntries(int, String, java.util.List, VoldemortFilter)}
* </li>
* <li>
* {@link AdminClient#fetchPartitionKeys(int, String, java.util.List, VoldemortFilter)}
* {@link AdminClientOLD#fetchPartitionKeys(int, String, java.util.List, VoldemortFilter)}
* </li>
* <li>
* {@link AdminClient#updateEntries(int, String, java.util.Iterator, VoldemortFilter)}
* {@link AdminClientOLD#updateEntries(int, String, java.util.Iterator, VoldemortFilter)}
* </li>
* </ul>
*
Expand Down
498 changes: 412 additions & 86 deletions src/java/voldemort/client/protocol/admin/AdminClient.java

Large diffs are not rendered by default.

352 changes: 352 additions & 0 deletions src/java/voldemort/client/protocol/admin/AdminClientOLD2.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,352 @@
/*
* Copyright 2008-2009 LinkedIn, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package voldemort.client.protocol.admin;

import java.io.IOException;
import java.io.StringReader;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;

import voldemort.VoldemortException;
import voldemort.client.ClientConfig;
import voldemort.client.SocketStoreClientFactory;
import voldemort.client.protocol.VoldemortFilter;
import voldemort.cluster.Cluster;
import voldemort.server.protocol.admin.AsyncOperationStatus;
import voldemort.store.StoreDefinition;
import voldemort.store.metadata.MetadataStore;
import voldemort.store.metadata.MetadataStore.VoldemortState;
import voldemort.utils.ByteArray;
import voldemort.utils.ByteUtils;
import voldemort.utils.Pair;
import voldemort.versioning.VectorClock;
import voldemort.versioning.Versioned;
import voldemort.xml.ClusterMapper;
import voldemort.xml.StoreDefinitionsMapper;

/**
* The base interface for all administrative functions.
*
* @author bbansal
*
*/
public abstract class AdminClientOLD2 {

private static final ClusterMapper clusterMapper = new ClusterMapper();
private static final StoreDefinitionsMapper storeMapper = new StoreDefinitionsMapper();

// Parameters for exponential back off
private static final long INITIAL_DELAY = 250; // Initial delay
private static final long MAX_DELAY = 1000 * 60; // Stop doing exponential
// back off once delay
// reaches this

private Cluster cluster;

public AdminClientOLD2(Cluster cluster) {
this.setCluster(cluster);
}

public AdminClientOLD2(String bootstrapUrl) {
// try to bootstrap metadata from bootstrapUrl
ClientConfig config = new ClientConfig().setBootstrapUrls(bootstrapUrl);
SocketStoreClientFactory factory = new SocketStoreClientFactory(config);

// get Cluster from bootStrapUrl
String clusterXml = factory.bootstrapMetadataWithRetries(MetadataStore.CLUSTER_KEY,
factory.validateUrls(config.getBootstrapUrls()));
setCluster(clusterMapper.readCluster(new StringReader(clusterXml)));

// release all threads/sockets hold by the factory.
factory.close();
}

/**
* streaming API to get all entries belonging to any of the partition in the
* input List.
*
* @param nodeId
* @param storeName
* @param partitionList: List of partitions to be fetched from remote
* server.
* @param filter: A VoldemortFilter class to do server side filtering or
* null
* @return
* @throws VoldemortException
*/
public abstract Iterator<Pair<ByteArray, Versioned<byte[]>>> fetchPartitionEntries(int nodeId,
String storeName,
List<Integer> partitionList,
VoldemortFilter filter);

/**
* streaming API to get a list of all the keys that belong to any of the
* partitions in the input list
*
* @param nodeId
* @param storeName
* @param partitionList
* @param filter
* @return
*/
public abstract Iterator<ByteArray> fetchPartitionKeys(int nodeId,
String storeName,
List<Integer> partitionList,
VoldemortFilter filter);

/**
* update Entries at (remote) node with all entries in iterator for passed
* storeName
*
* @param nodeId
* @param storeName
* @param entryIterator
* @param filter: A VoldemortFilter class to do server side filtering or
* null.
* @throws VoldemortException
* @throws IOException
*/
public abstract void updateEntries(int nodeId,
String storeName,
Iterator<Pair<ByteArray, Versioned<byte[]>>> entryIterator,
VoldemortFilter filter);

/**
* Delete all Entries at (remote) node for partitions in partitionList
*
* @param nodeId
* @param storeName
* @param partitionList
* @param filter: A VoldemortFilter class to do server side filtering or
* null.
* @throws VoldemortException
* @throws IOException
*/
public abstract int deletePartitions(int nodeId,
String storeName,
List<Integer> partitionList,
VoldemortFilter filter);

/**
* Pipe fetch from donorNode and update stealerNode in streaming mode.
*/
public abstract int fetchAndUpdateStreams(int donorNodeId,
int stealerNodeId,
String storeName,
List<Integer> stealList,
VoldemortFilter filter);

/**
* cleanly close this client, freeing any resource.
*/
public abstract void stop();

/**
* Get the status of asynchornous request
*
* @param nodeId Node to contact
* @param requestId Previously returned request Id
* @return A Pair of String (request status) and Boolean (is request
* complete?)
*/
public abstract AsyncOperationStatus getAsyncRequestStatus(int nodeId, int requestId);

/**
* update remote metadata on a particular node
*
* @param remoteNodeId
* @param key
* @param value
*/
protected abstract void doUpdateRemoteMetadata(int remoteNodeId,
ByteArray key,
Versioned<byte[]> value);

/**
* get remote metadata on a particular node
*
* @param remoteNodeId
* @param key
* @return
*/
protected abstract Versioned<byte[]> doGetRemoteMetadata(int remoteNodeId, ByteArray key);

/**
* Wait for a task to finish completion, using exponential backoff to poll
* the task completion status
*
* @param nodeId Id of the node to poll
* @param requestId Id of the request to check
* @param maxWait Maximum time we'll keep checking a request until we give
* up
* @param timeUnit Unit in which
* @param maxWait is expressed
* @throws VoldemortException if task failed to finish in specified maxWait
* time.
*/
public void waitForCompletion(int nodeId, int requestId, long maxWait, TimeUnit timeUnit) {
long delay = INITIAL_DELAY;
long waitUntil = System.currentTimeMillis() + timeUnit.toMillis(maxWait);

while(System.currentTimeMillis() < waitUntil) {
AsyncOperationStatus status = getAsyncRequestStatus(nodeId, requestId);
if(status.isComplete())
return;
if(delay < MAX_DELAY)
// keep doubling the wait period until we rach maxDelay
delay <<= 2;
try {
Thread.sleep(delay);
} catch(InterruptedException e) {
Thread.currentThread().interrupt();
}
}
throw new VoldemortException("Failed to finish task requestId:" + requestId + " in maxWait"
+ maxWait + " " + timeUnit.toString());
}

/* Helper functions */

/**
* update metadata at remote node.
*
* @param remoteNodeId
* @param key
* @param value
*/
public void updateRemoteMetadata(int remoteNodeId, String key, Versioned<String> value) {
ByteArray keyBytes = new ByteArray(ByteUtils.getBytes(key, "UTF-8"));
Versioned<byte[]> valueBytes = new Versioned<byte[]>(ByteUtils.getBytes(value.getValue(),
"UTF-8"),
value.getVersion());

doUpdateRemoteMetadata(remoteNodeId, keyBytes, valueBytes);
}

/**
* get metadata from remote node.
*
* @param remoteNodeId
* @param key
* @return
*/
public Versioned<String> getRemoteMetadata(int remoteNodeId, String key) {
ByteArray keyBytes = new ByteArray(ByteUtils.getBytes(key, "UTF-8"));
Versioned<byte[]> value = doGetRemoteMetadata(remoteNodeId, keyBytes);
return new Versioned<String>(ByteUtils.getString(value.getValue(), "UTF-8"),
value.getVersion());
}

/**
* update cluster information on a remote node.
*
* @param nodeId
* @param cluster
* @throws VoldemortException
*/
public void updateRemoteCluster(int nodeId, Cluster cluster) throws VoldemortException {
// get current version.
VectorClock oldClock = (VectorClock) getRemoteCluster(nodeId).getVersion();

updateRemoteMetadata(nodeId,
MetadataStore.CLUSTER_KEY,
new Versioned<String>(clusterMapper.writeCluster(cluster),
oldClock.incremented(nodeId, 1)));
}

/**
* get cluster information from a remote node.
*
* @param nodeId
* @return
* @throws VoldemortException
*/
public Versioned<Cluster> getRemoteCluster(int nodeId) throws VoldemortException {
Versioned<String> value = getRemoteMetadata(nodeId, MetadataStore.CLUSTER_KEY);
Cluster cluster = clusterMapper.readCluster(new StringReader(value.getValue()));
return new Versioned<Cluster>(cluster, value.getVersion());

}

/**
* update store definitions on remote node.
*
* @param nodeId
* @param storesList
* @throws VoldemortException
*/
public void updateRemoteStoreDefList(int nodeId, List<StoreDefinition> storesList)
throws VoldemortException {
// get current version.
VectorClock oldClock = (VectorClock) getRemoteStoreDefList(nodeId).getVersion();

updateRemoteMetadata(nodeId,
MetadataStore.STORES_KEY,
new Versioned<String>(storeMapper.writeStoreList(storesList),
oldClock.incremented(nodeId, 1)));
}

/**
* get store definitions from a remote node.
*
* @param nodeId
* @return
* @throws VoldemortException
*/
public Versioned<List<StoreDefinition>> getRemoteStoreDefList(int nodeId)
throws VoldemortException {
Versioned<String> value = getRemoteMetadata(nodeId, MetadataStore.STORES_KEY);
List<StoreDefinition> storeList = storeMapper.readStoreList(new StringReader(value.getValue()));
return new Versioned<List<StoreDefinition>>(storeList, value.getVersion());
}

/**
* update serverState on a remote node.
*
* @param nodeId
* @param state
*/
public void updateRemoteServerState(int nodeId, MetadataStore.VoldemortState state) {
VectorClock oldClock = (VectorClock) getRemoteServerState(nodeId).getVersion();

updateRemoteMetadata(nodeId,
MetadataStore.SERVER_STATE_KEY,
new Versioned<String>(state.toString(),
oldClock.incremented(nodeId, 1)));
}

/**
* get serverState from a remoteNode.
*
* @param nodeId
* @return
*/
public Versioned<VoldemortState> getRemoteServerState(int nodeId) {
Versioned<String> value = getRemoteMetadata(nodeId, MetadataStore.SERVER_STATE_KEY);
return new Versioned<VoldemortState>(VoldemortState.valueOf(value.getValue()),
value.getVersion());
}

public void setCluster(Cluster cluster) {
this.cluster = cluster;
}

public Cluster getCluster() {
return cluster;
}
}
Loading

0 comments on commit 108c448

Please sign in to comment.