Skip to content

Commit

Permalink
Added a MultiJVM AdminTest.
Browse files Browse the repository at this point in the history
  • Loading branch information
bbansal committed Nov 16, 2009
1 parent 2fd35f9 commit cffd8d7
Show file tree
Hide file tree
Showing 7 changed files with 290 additions and 177 deletions.
7 changes: 5 additions & 2 deletions src/java/voldemort/client/protocol/admin/AdminClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,11 @@ public abstract int fetchAndUpdateStreams(int donorNodeId,
List<Integer> stealList,
VoldemortFilter filter);

/**
* cleanly close this client, freeing any resource.
*/
public abstract void stop();

/**
* Get the status of asynchornous request
*
Expand Down Expand Up @@ -212,8 +217,6 @@ public boolean waitForCompletion(int nodeId, int requestId, long maxWait, TimeUn
return false;
}

public abstract void close();

/* Helper functions */

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ private void close(Socket socket) {
}

@Override
public void close() {
public void stop() {
this.pool.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ public void operate() {
}
}
} finally {
adminClient.close();
adminClient.stop();
}
}

Expand Down
4 changes: 2 additions & 2 deletions test/common/voldemort/ServerTestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ public static ProtoBuffAdminClientRequestFormat getAdminClient(Cluster cluster)
ClientConfig config = new ClientConfig();
config.setMaxConnectionsPerNode(2);
config.setConnectionTimeout(10000, TimeUnit.MILLISECONDS);
config.setSocketTimeout(10000, TimeUnit.MILLISECONDS);
config.setSocketTimeout(5 * 60 * 1000, TimeUnit.MILLISECONDS);
config.setSocketBufferSize(32 * 1024);

return new ProtoBuffAdminClientRequestFormat(cluster, config);
Expand All @@ -365,7 +365,7 @@ public static ProtoBuffAdminClientRequestFormat getAdminClient(String bootstrapU
ClientConfig config = new ClientConfig();
config.setMaxConnectionsPerNode(2);
config.setConnectionTimeout(10000, TimeUnit.MILLISECONDS);
config.setSocketTimeout(10000, TimeUnit.MILLISECONDS);
config.setSocketTimeout(5 * 60 * 1000, TimeUnit.MILLISECONDS);
config.setSocketBufferSize(32 * 1024);

return new ProtoBuffAdminClientRequestFormat(bootstrapURL, config);
Expand Down
135 changes: 135 additions & 0 deletions test/unit/voldemort/client/AbstractAdminServiceFilterTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package voldemort.client;

import java.util.Arrays;
import java.util.Iterator;
import java.util.Set;

import junit.framework.TestCase;
import voldemort.client.protocol.VoldemortFilter;
import voldemort.client.protocol.admin.AdminClient;
import voldemort.store.Store;
import voldemort.utils.ByteArray;
import voldemort.utils.ByteUtils;
import voldemort.utils.Pair;
import voldemort.versioning.Versioned;

public abstract class AbstractAdminServiceFilterTest extends TestCase {

private static String testStoreName = "test-replication-memory";

protected abstract AdminClient getAdminClient();

protected abstract Set<Pair<ByteArray, Versioned<byte[]>>> createEntries();

protected abstract Store<ByteArray, byte[]> getStore(int nodeId, String storeName);

public void testFetchAsStreamWithFilter() {
// user store should be present
Store<ByteArray, byte[]> store = getStore(0, testStoreName);
assertNotSame("Store '" + testStoreName + "' should not be null", null, store);

VoldemortFilter filter = new VoldemortFilterImpl();
int shouldFilterCount = 0;
for(Pair<ByteArray, Versioned<byte[]>> pair: createEntries()) {
store.put(pair.getFirst(), pair.getSecond());
if(!filter.accept(pair.getFirst(), pair.getSecond())) {
shouldFilterCount++;
}
}

assertNotSame("should be filtered key count shoud not be 0.", 0, shouldFilterCount);

// make fetch stream call with filter
Iterator<Pair<ByteArray, Versioned<byte[]>>> entryIterator = getAdminClient().fetchPartitionEntries(0,
testStoreName,
Arrays.asList(new Integer[] { 0 }),
filter);

// assert none of the filtered entries are returned.
while(entryIterator.hasNext()) {
Pair<ByteArray, Versioned<byte[]>> entry = entryIterator.next();
if(!filter.accept(entry.getFirst(), entry.getSecond())) {
fail();
}
}
}

public void testDeleteStreamWithFilter() {
// user store should be present
Store<ByteArray, byte[]> store = getStore(0, testStoreName);
assertNotSame("Store '" + testStoreName + "' should not be null", null, store);

Set<Pair<ByteArray, Versioned<byte[]>>> entrySet = createEntries();

VoldemortFilter filter = new VoldemortFilterImpl();
for(Pair<ByteArray, Versioned<byte[]>> pair: entrySet) {
store.put(pair.getFirst(), pair.getSecond());
}

// make delete stream call with filter
getAdminClient().deletePartitions(0,
testStoreName,
Arrays.asList(new Integer[] { 0, 1, 2, 3 }),
filter);

// assert none of the filtered entries are returned.
for(Pair<ByteArray, Versioned<byte[]>> entry: entrySet) {
if(filter.accept(entry.getFirst(), entry.getSecond())) {
assertEquals("All entries should be deleted except the filtered ones.",
0,
store.get(entry.getFirst()).size());
} else {
assertNotSame("filtered entry should be still present.",
0,
store.get(entry.getFirst()).size());
assertEquals("values should match",
new String(entry.getSecond().getValue()),
new String(store.get(entry.getFirst()).get(0).getValue()));
}
}
}

public void testUpdateAsStreamWithFilter() {
VoldemortFilter filter = new VoldemortFilterImpl();
Set<Pair<ByteArray, Versioned<byte[]>>> entrySet = createEntries();

// make update stream call with filter
getAdminClient().updateEntries(0, testStoreName, entrySet.iterator(), filter);

// assert none of the filtered entries are updated.
// user store should be present
Store<ByteArray, byte[]> store = getStore(0, testStoreName);

assertNotSame("Store '" + testStoreName + "' should not be null", null, store);

for(Pair<ByteArray, Versioned<byte[]>> entry: entrySet) {
if(filter.accept(entry.getFirst(), entry.getSecond())) {
assertEquals("Store should have this key/value pair",
1,
store.get(entry.getFirst()).size());
assertEquals("Store should have this key/value pair",
entry.getSecond(),
store.get(entry.getFirst()).get(0));
} else {
assertEquals("Store should Not have this key/value pair",
0,
store.get(entry.getFirst()).size());
}
}
}

public static class VoldemortFilterImpl implements VoldemortFilter {

public VoldemortFilterImpl() {
System.out.println("instantiating voldemortFilter");
}

public boolean accept(Object key, Versioned<?> value) {
String keyString = ByteUtils.getString(((ByteArray) key).get(), "UTF-8");
if(Integer.parseInt(keyString) % 10 == 3) {
return false;
}
return true;
}
}
}
Loading

0 comments on commit cffd8d7

Please sign in to comment.