Skip to content

Commit

Permalink
GEODE-10261: VMProvider.invokeAsync uses appropriate parameterization. (
Browse files Browse the repository at this point in the history
apache#7631)

* Cleanup use of AsyncInvocation.
* Add generic parameters to AsyncInvocation variables where they were missing.
* Cleaned up by changing single element arrays to variables.
  • Loading branch information
Patrick Johnson authored Jun 15, 2022
1 parent e2ac111 commit c9c772d
Show file tree
Hide file tree
Showing 144 changed files with 831 additions and 896 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -466,8 +466,8 @@ public void run2() throws CacheException {
};

logger.info("before initialize client");
AsyncInvocation inv2 = vm2.invokeAsync("Initialize Client", initializeClient);
AsyncInvocation inv3 = vm3.invokeAsync("Initialize Client", initializeClient);
AsyncInvocation<Void> inv2 = vm2.invokeAsync("Initialize Client", initializeClient);
AsyncInvocation<Void> inv3 = vm3.invokeAsync("Initialize Client", initializeClient);

inv2.await();
inv3.await();
Expand Down Expand Up @@ -526,7 +526,7 @@ public void close() {}
// Initialize each client with entries (so that afterInvalidate is called)

logger.info("before initialize client");
AsyncInvocation inv2 = client.invokeAsync("Initialize Client", () -> {
AsyncInvocation<Void> inv2 = client.invokeAsync("Initialize Client", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
PoolStats stats = ((PoolImpl) PoolManager.find(poolName)).getStats();
int oldConnects = stats.getConnects();
Expand Down Expand Up @@ -780,8 +780,8 @@ public void basicTestLifetimeExpire()
throws CacheException, InterruptedException {
final String name = getName();

AsyncInvocation putAI = null;
AsyncInvocation putAI2 = null;
AsyncInvocation<Void> putAI = null;
AsyncInvocation<Void> putAI2 = null;

try {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,8 @@ public void clientSlowToHandshakeDoesNotBlockServer() throws Throwable {

String hostName = getHostName();

AsyncInvocation slowAsync = slowClientVM.invokeAsync(() -> connectToServer(hostName, port));
AsyncInvocation<Void> slowAsync =
slowClientVM.invokeAsync(() -> connectToServer(hostName, port));
try {
getBlackboard().waitForGate("serverIsBlocked", 60, TimeUnit.SECONDS);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2016,8 +2016,8 @@ public void testDRLoadRejection() throws Exception {
final int fakeHeapMaxSize = 1000;

// Make sure the desired VMs will have a fresh DS.
AsyncInvocation d1 = replicate1.invokeAsync(JUnit4DistributedTestCase::disconnectFromDS);
AsyncInvocation d2 = replicate2.invokeAsync(JUnit4DistributedTestCase::disconnectFromDS);
AsyncInvocation<Void> d1 = replicate1.invokeAsync(JUnit4DistributedTestCase::disconnectFromDS);
AsyncInvocation<Void> d2 = replicate2.invokeAsync(JUnit4DistributedTestCase::disconnectFromDS);
d1.join();
assertFalse(d1.exceptionOccurred());
d2.join();
Expand Down Expand Up @@ -2232,8 +2232,8 @@ public void testPRLoadRejection() throws Exception {
final int fakeHeapMaxSize = 1000;

// Make sure the desired VMs will have a fresh DS.
AsyncInvocation d0 = accessor.invokeAsync(JUnit4DistributedTestCase::disconnectFromDS);
AsyncInvocation d1 = ds1.invokeAsync(JUnit4DistributedTestCase::disconnectFromDS);
AsyncInvocation<Void> d0 = accessor.invokeAsync(JUnit4DistributedTestCase::disconnectFromDS);
AsyncInvocation<Void> d1 = ds1.invokeAsync(JUnit4DistributedTestCase::disconnectFromDS);
d0.join();
assertFalse(d0.exceptionOccurred());
d1.join();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -529,8 +529,8 @@ public void testDRLoadRejection() throws Exception {
final String rName = getUniqueName();

// Make sure the desired VMs will have a fresh DS.
AsyncInvocation d1 = replicate1.invokeAsync(JUnit4DistributedTestCase::disconnectFromDS);
AsyncInvocation d2 = replicate2.invokeAsync(JUnit4DistributedTestCase::disconnectFromDS);
AsyncInvocation<Void> d1 = replicate1.invokeAsync(JUnit4DistributedTestCase::disconnectFromDS);
AsyncInvocation<Void> d2 = replicate2.invokeAsync(JUnit4DistributedTestCase::disconnectFromDS);
d1.join();
assertFalse(d1.exceptionOccurred());
d2.join();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,9 @@ public void run() {
}
};

AsyncInvocation future1 = vm0.invokeAsync(assignBuckets);
AsyncInvocation future2 = vm1.invokeAsync(assignBuckets);
AsyncInvocation future3 = vm2.invokeAsync(assignBuckets);
AsyncInvocation<Void> future1 = vm0.invokeAsync(assignBuckets);
AsyncInvocation<Void> future2 = vm1.invokeAsync(assignBuckets);
AsyncInvocation<Void> future3 = vm2.invokeAsync(assignBuckets);
future1.join(60 * 1000);
future2.join(60 * 1000);
future3.join(60 * 1000);
Expand Down Expand Up @@ -220,9 +220,9 @@ public void run() {
}
};

AsyncInvocation future1 = vm0.invokeAsync(assignBuckets);
AsyncInvocation future2 = vm1.invokeAsync(assignBuckets);
AsyncInvocation future3 = vm2.invokeAsync(assignBuckets);
AsyncInvocation<Void> future1 = vm0.invokeAsync(assignBuckets);
AsyncInvocation<Void> future2 = vm1.invokeAsync(assignBuckets);
AsyncInvocation<Void> future3 = vm2.invokeAsync(assignBuckets);
future1.join();
future2.join();
future3.join();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public void run2() {

public void doQuery() throws InterruptedException {
final String[] qarr = {"1", "519", "181"};
AsyncInvocation as0 = vm0.invokeAsync(new CacheSerializableRunnable("Executing query") {
AsyncInvocation<Void> as0 = vm0.invokeAsync(new CacheSerializableRunnable("Executing query") {
@Override
public void run2() throws CacheException {
for (int i = 0; i < 50; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public Object call() throws Exception {
// Execute same query remotely from client using 2 threads
// Since this is a bind query, the query object will be shared
// between the 2 threads.
AsyncInvocation a1 = client.invokeAsync(new SerializableCallable("Query from client") {
AsyncInvocation<Void> a1 = client.invokeAsync(new SerializableCallable("Query from client") {
@Override
public Object call() throws Exception {
QueryService qs = null;
Expand All @@ -129,7 +129,7 @@ public Object call() throws Exception {
}
});

AsyncInvocation a2 = client.invokeAsync(new SerializableCallable("Query from client") {
AsyncInvocation<Void> a2 = client.invokeAsync(new SerializableCallable("Query from client") {
@Override
public Object call() throws Exception {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ public void testClientServerQueryUsingRemoteQueryService()
new int[] {port0, port1, port2}, true);

final int size = 100;
AsyncInvocation[] asyncInvocationArray = new AsyncInvocation[size];
AsyncInvocation<?>[] asyncInvocationArray = new AsyncInvocation[size];
for (int i = 0; i < size; i++) {
asyncInvocationArray[i] =
client.invokeAsync(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public void testCompactRangeIndex() {
// Invoke update from client and stop in updateIndex
// firesultSett before updating the RegionEntry and second after updating
// the RegionEntry.
AsyncInvocation putThread = server.invokeAsync("update a Region Entry", () -> {
AsyncInvocation<Void> putThread = server.invokeAsync("update a Region Entry", () -> {
Region repRegion = cacheRule.getCache().getRegion(repRegionName);
IndexManager.testHook = new IndexManagerTestHook();
repRegion.put(new Integer("1"), new Portfolio(cntDest + 1));
Expand Down Expand Up @@ -196,7 +196,7 @@ public void testRangeIndex() {
// Invoke update from client and stop in updateIndex
// firesultSett before updating the RegionEntry and second after updating
// the RegionEntry.
AsyncInvocation putThread = server.invokeAsync("update a Region Entry", () -> {
AsyncInvocation<Void> putThread = server.invokeAsync("update a Region Entry", () -> {
Cache cache = cacheRule.getCache();
Region repRegion = cache.getRegion(repRegionName);
IndexManager.testHook = new IndexManagerTestHook();
Expand Down Expand Up @@ -281,7 +281,7 @@ public void testRangeIndexWithIndexAndQueryFromClauseMisMatch() {
// Invoke update from client and stop in updateIndex
// firesultSett before updating the RegionEntry and second after updating
// the RegionEntry.
AsyncInvocation putThread = server.invokeAsync("update a Region Entry", () -> {
AsyncInvocation<Void> putThread = server.invokeAsync("update a Region Entry", () -> {
Region repRegion = cacheRule.getCache().getRegion(repRegionName);
IndexManager.testHook = new IndexManagerTestHook();
// This portfolio with same ID must have different positions.
Expand Down Expand Up @@ -361,7 +361,7 @@ public void testRangeIndexWithIndexAndQueryFromClauseMisMatch2() {
// Invoke update from client and stop in updateIndex
// firesultSett before updating the RegionEntry and second after updating
// the RegionEntry.
AsyncInvocation putThread = server.invokeAsync("update a Region Entry", () -> {
AsyncInvocation<Void> putThread = server.invokeAsync("update a Region Entry", () -> {
Cache cache = cacheRule.getCache();
Region repRegion = cache.getRegion(repRegionName);
IndexManager.testHook = new IndexManagerTestHook();
Expand Down Expand Up @@ -420,7 +420,7 @@ public void testRangeIndexWithIndexAndQueryFromClauseMisMatch2() {
await().until(joinThread(putThread));
}

private Callable<Boolean> joinThread(AsyncInvocation thread) {
private Callable<Boolean> joinThread(AsyncInvocation<Void> thread) {
return () -> {
try {
thread.join(100L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@ public void createIndexesAndUpdatesAndThenValidate() throws InterruptedException
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);

AsyncInvocation ai1 = null;
AsyncInvocation ai2 = null;
AsyncInvocation<Void> ai1 = null;
AsyncInvocation<Void> ai2 = null;

vm0.invoke(QueryIndexDUnitTest::createIndex);
vm0.invoke(QueryIndexDUnitTest::validateIndexUsage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ public void testCreateIndexThroughXML() throws Exception {

getLogWriter().info("Creating index using an xml file name : " + CACHE_XML_FILE_NAME);

AsyncInvocation async0 = vm0.invokeAsync(createIndexThroughXML(NAME));
AsyncInvocation async1 = vm1.invokeAsync(createIndexThroughXML(NAME));
AsyncInvocation<Void> async0 = vm0.invokeAsync(createIndexThroughXML(NAME));
AsyncInvocation<Void> async1 = vm1.invokeAsync(createIndexThroughXML(NAME));

async1.await();
async0.await();
Expand Down Expand Up @@ -365,7 +365,7 @@ public void testCreateAsyncIndexWhileDoingGII() throws Exception {

getLogWriter().info("Creating index using an xml file name : " + CACHE_XML_FILE_NAME);

AsyncInvocation async0 = vm0.invokeAsync(createIndexThroughXML(NAME));
AsyncInvocation<Void> async0 = vm0.invokeAsync(createIndexThroughXML(NAME));

async0.await();

Expand All @@ -374,7 +374,7 @@ public void testCreateAsyncIndexWhileDoingGII() throws Exception {

vm1.invoke(setTestHook());

AsyncInvocation async1 = vm1.invokeAsync(createIndexThroughXML(NAME));
AsyncInvocation<Void> async1 = vm1.invokeAsync(createIndexThroughXML(NAME));

vm0.invoke(prIndexCreationCheck(NAME, STATUS_INDEX, 50));

Expand Down Expand Up @@ -451,7 +451,7 @@ public void testCreateAsyncIndexWhileDoingGIIAndQuery() throws Exception {

getLogWriter().info("Creating index using an xml file name : " + CACHE_XML_FILE_NAME);

AsyncInvocation async0 = vm0.invokeAsync(createIndexThroughXML(NAME));
AsyncInvocation<Void> async0 = vm0.invokeAsync(createIndexThroughXML(NAME));

async0.await();

Expand All @@ -460,7 +460,7 @@ public void testCreateAsyncIndexWhileDoingGIIAndQuery() throws Exception {

vm1.invoke(setTestHook());

AsyncInvocation async1 = vm1.invokeAsync(createIndexThroughXML(NAME));
AsyncInvocation<Void> async1 = vm1.invokeAsync(createIndexThroughXML(NAME));

async1.await();
async0.await();
Expand Down Expand Up @@ -496,7 +496,7 @@ public void testCreateAsyncIndexWhileDoingGIIAndCompareQueryResults() throws Exc
vm0.invokeAsync(loadRegion(NAME, 500));
vm0.invokeAsync(loadRegion(REP_REG_NAME, 500));

AsyncInvocation async0 = vm0.invokeAsync(loadRegion(PERSISTENT_REG_NAME, 500));
AsyncInvocation<Void> async0 = vm0.invokeAsync(loadRegion(PERSISTENT_REG_NAME, 500));

vm0.invokeAsync(loadRegion(NO_INDEX_REP_REG, 500));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ public void testPRGatherCancellation() throws Throwable {
DefaultQuery.testHook = getPauseHook(true, controller);
});
// remove from here to ....
AsyncInvocation queryExecution1 = executeQueryOnClient(client);
AsyncInvocation<Integer> queryExecution1 = executeQueryOnClient(client);

// Gives async invocation a chance to start
Thread.sleep(1000);
Expand All @@ -291,7 +291,7 @@ public void testPRGatherCancellation() throws Throwable {
});
// We simulate a low memory/critical heap percentage hit
setHeapToCriticalAndReleaseLatch(server1);
AsyncInvocation queryExecution = executeQueryOnClient(client);
AsyncInvocation<Integer> queryExecution = executeQueryOnClient(client);
await().untilAsserted(() -> assertThat(queryExecution.get()).isEqualTo(0));

verifyDroppedObjectsAndSetHeapToNormal(server1);
Expand All @@ -302,7 +302,7 @@ public void testPRGatherCancellation() throws Throwable {
}
}

private AsyncInvocation executeQueryOnClient(VM client) {
private AsyncInvocation<Integer> executeQueryOnClient(VM client) {
return client.invokeAsync("execute query from client", () -> {
try {
Query query1 =
Expand Down Expand Up @@ -659,7 +659,7 @@ private void doTestCriticalHeapAndQueryTimeout(VM server, VM client,
throws InterruptedException {
createLatchTestHook(server, hitCriticalThreshold, VM.getController());

AsyncInvocation queryExecution = invokeClientQuery(client,
AsyncInvocation<Integer> queryExecution = invokeClientQuery(client,
disabledQueryMonitorForLowMem, queryTimeout, hitCriticalThreshold, VM.getController());

criticalMemoryCountDownLatch.await();
Expand All @@ -684,7 +684,7 @@ private void doTestCriticalHeapAndQueryTimeout(VM server, VM client,
private void executeQueryWithCriticalHeapCalledAfterTimeout(VM server, VM client)
throws InterruptedException {
createLatchTestHook(server, false, VM.getController());
AsyncInvocation queryExecution = executeQueryWithTimeout(client);
AsyncInvocation<Integer> queryExecution = executeQueryWithTimeout(client);

// Wait till the timeout expires on the query
Thread.sleep(1 + TEST_QUERY_TIMEOUT);
Expand All @@ -702,7 +702,7 @@ private void executeQueryWithCriticalHeapCalledAfterTimeout(VM server, VM client

}

private AsyncInvocation executeQueryWithTimeout(VM client) {
private AsyncInvocation<Integer> executeQueryWithTimeout(VM client) {
return client.invokeAsync("execute query from client", () -> {
QueryService qs;
try {
Expand Down Expand Up @@ -738,7 +738,7 @@ private AsyncInvocation executeQueryWithTimeout(VM client) {

}

private AsyncInvocation invokeClientQuery(VM client,
private AsyncInvocation<Integer> invokeClientQuery(VM client,
final boolean disabledQueryMonitorForLowMem,
final int queryTimeout,
final boolean hitCriticalThreshold,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public void run2() throws CacheException {

// Start changing the value in Region which should turn into a deadlock if
// the fix is not there
AsyncInvocation asyncInv1 =
AsyncInvocation<Void> asyncInv1 =
vm0.invokeAsync(new CacheSerializableRunnable("Change value in region") {

@Override
Expand All @@ -164,7 +164,7 @@ public void run2() throws CacheException {
}
});

AsyncInvocation asyncInv2 =
AsyncInvocation<Void> asyncInv2 =
vm0.invokeAsync(new CacheSerializableRunnable("Run query on region") {

@Override
Expand Down Expand Up @@ -286,7 +286,7 @@ public void run2() throws CacheException {
}
});

AsyncInvocation asyncInv1 =
AsyncInvocation<Void> asyncInv1 =
vm1.invokeAsync(new CacheSerializableRunnable("Change value in region") {

@Override
Expand All @@ -302,7 +302,7 @@ public void run2() throws CacheException {
}
});

AsyncInvocation asyncInv2 =
AsyncInvocation<Void> asyncInv2 =
vm0.invokeAsync(new CacheSerializableRunnable("Run query on region") {

@Override
Expand Down Expand Up @@ -393,7 +393,7 @@ public void hook(int spot) throws RuntimeException {
}

// Asynch invocation for continuous index updates
AsyncInvocation indexUpdateAsysnch =
AsyncInvocation<Void> indexUpdateAsysnch =
vm0.invokeAsync(new CacheSerializableRunnable("index updates") {

@Override
Expand Down
Loading

0 comments on commit c9c772d

Please sign in to comment.