Skip to content

Commit

Permalink
java: add a new test for multiple sessions on a single client
Browse files Browse the repository at this point in the history
I was hoping this would reproduce KUDU-2053, a client bug that seems to
be hitting Spark upsert use cases, but it didn't reproduce it.
Nonetheless, we don't seem to have any test coverage of multithreaded
use of a KuduClient with different KuduSession objects, so this test is
valuable.

Change-Id: Ife7f02b160d4635e8acb0155c98a1ef9c3dbab5e
Reviewed-on: http://gerrit.cloudera.org:8080/7361
Reviewed-by: Jean-Daniel Cryans <[email protected]>
Tested-by: Jean-Daniel Cryans <[email protected]>
  • Loading branch information
toddlipcon authored and jdcryans committed Jul 12, 2017
1 parent ac2a285 commit 96cdcbd
Showing 1 changed file with 56 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.kudu.client;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

import java.io.Closeable;
Expand All @@ -26,13 +25,14 @@
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;

import org.junit.Test;

import org.apache.kudu.client.SessionConfiguration.FlushMode;
import org.apache.kudu.util.CapturingLogAppender;

public class ITClientStress extends BaseKuduTest {
Expand Down Expand Up @@ -72,7 +72,9 @@ public void run() {
} finally {
pool.shutdown();
assertTrue(pool.awaitTermination(10, TimeUnit.SECONDS));
assertNull(thrown.get());
if (thrown.get() != null) {
throw new AssertionError(thrown.get());
}
}
assertFalse("log contained NPE",
cla.getAppendedText().contains("NullPointerException"));
Expand Down Expand Up @@ -111,4 +113,55 @@ public Void call() throws Exception {
}
});
}

/**
* Stress test which performs upserts from many sessions on different threads
* sharing the same KuduClient and KuduTable instance.
*/
@Test(timeout=60000)
public void testMultipleSessions() throws Exception {
final String TABLE_NAME = "testMultipleSessions";
final int SECONDS_TO_RUN = 10;
final int NUM_THREADS = 60;
final KuduTable table = createTable(TABLE_NAME, basicSchema,
getBasicCreateTableOptions());
final AtomicInteger numUpserted = new AtomicInteger(0);
try (final KuduClient client =
new KuduClient.KuduClientBuilder(masterAddresses)
.defaultAdminOperationTimeoutMs(DEFAULT_SLEEP)
.build()) {

runTasks(NUM_THREADS, SECONDS_TO_RUN, new Supplier<Callable<Void>>() {
@Override
public Callable<Void> get() {
return new Callable<Void>() {
@Override
public Void call() throws Exception {
KuduSession s = client.newSession();
s.setFlushMode(FlushMode.AUTO_FLUSH_SYNC);
try {
for (int i = 0; i < 100; i++) {
Upsert u = table.newUpsert();
u.getRow().addInt(0, i);
u.getRow().addInt(1, 12345);
u.getRow().addInt(2, 3);
u.getRow().setNull(3);
u.getRow().addBoolean(4, false);
OperationResponse apply = s.apply(u);
if (apply.hasRowError()) {
throw new AssertionError(apply.getRowError().toString());
}
numUpserted.incrementAndGet();
}
} finally {
s.close();
}
return null;
}
};
}
});
}
LOG.info("Upserted {} rows", numUpserted.get());
}
}

0 comments on commit 96cdcbd

Please sign in to comment.