Skip to content

Commit

Permalink
java: Move scan token tests into their own class
Browse files Browse the repository at this point in the history
A follow-up commit will add additional tests there.

Change-Id: Iefc23f0193cb24a00005a5554881eedbaac15929
Reviewed-on: http://gerrit.cloudera.org:8080/10724
Tested-by: Kudu Jenkins
Reviewed-by: Dan Burkert <[email protected]>
  • Loading branch information
mpercy committed Jul 3, 2018
1 parent 8155d8c commit 0cda8c8
Show file tree
Hide file tree
Showing 3 changed files with 280 additions and 228 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,8 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import com.stumbleupon.async.Deferred;

import org.apache.kudu.util.TimestampUtil;
Expand Down Expand Up @@ -586,238 +584,12 @@ public void testScanWithPredicates() throws Exception {
).size());
}

/**
* Counts the rows in the provided scan tokens.
*/
private int countScanTokenRows(List<KuduScanToken> tokens) throws Exception {
final AtomicInteger count = new AtomicInteger(0);
List<Thread> threads = new ArrayList<>();
for (final KuduScanToken token : tokens) {
final byte[] serializedToken = token.serialize();
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try (KuduClient contextClient = new KuduClient.KuduClientBuilder(masterAddresses)
.defaultAdminOperationTimeoutMs(DEFAULT_SLEEP)
.build()) {
KuduScanner scanner = KuduScanToken.deserializeIntoScanner(serializedToken, contextClient);
try {
int localCount = 0;
while (scanner.hasMoreRows()) {
localCount += Iterators.size(scanner.nextRows());
}
count.addAndGet(localCount);
} finally {
scanner.close();
}
} catch (Exception e) {
LOG.error("exception in parallel token scanner", e);
}
}
});
thread.run();
threads.add(thread);
}

for (Thread thread : threads) {
thread.join();
}
return count.get();
}

@Test
public void testGetAuthnToken() throws Exception {
byte[] token = client.exportAuthenticationCredentials().join();
assertNotNull(token);
}

/**
* Tests scan tokens by creating a set of scan tokens, serializing them, and
* then executing them in parallel with separate client instances. This
* simulates the normal usecase of scan tokens being created at a central
* planner and distributed to remote task executors.
*/
@Test
public void testScanTokens() throws Exception {
int saveFetchTablets = AsyncKuduClient.FETCH_TABLETS_PER_RANGE_LOOKUP;
try {
// For this test, make sure that we cover the case that not all tablets
// are returned in a single batch.
AsyncKuduClient.FETCH_TABLETS_PER_RANGE_LOOKUP = 4;

Schema schema = createManyStringsSchema();
CreateTableOptions createOptions = new CreateTableOptions();
createOptions.addHashPartitions(ImmutableList.of("key"), 8);

PartialRow splitRow = schema.newPartialRow();
splitRow.addString("key", "key_50");
createOptions.addSplitRow(splitRow);

syncClient.createTable(tableName, schema, createOptions);

KuduSession session = syncClient.newSession();
session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
KuduTable table = syncClient.openTable(tableName);
for (int i = 0; i < 100; i++) {
Insert insert = table.newInsert();
PartialRow row = insert.getRow();
row.addString("key", String.format("key_%02d", i));
row.addString("c1", "c1_" + i);
row.addString("c2", "c2_" + i);
session.apply(insert);
}
session.flush();

KuduScanToken.KuduScanTokenBuilder tokenBuilder = syncClient.newScanTokenBuilder(table);
tokenBuilder.batchSizeBytes(0);
tokenBuilder.setProjectedColumnIndexes(ImmutableList.<Integer>of());
List<KuduScanToken> tokens = tokenBuilder.build();
assertEquals(16, tokens.size());

// KUDU-1809, with batchSizeBytes configured to '0',
// the first call to the tablet server won't return
// any data.
{
KuduScanner scanner = tokens.get(0).intoScanner(syncClient);
assertEquals(0, scanner.nextRows().getNumRows());
}

for (KuduScanToken token : tokens) {
// Sanity check to make sure the debug printing does not throw.
LOG.debug(KuduScanToken.stringifySerializedToken(token.serialize(), syncClient));
}
} finally {
AsyncKuduClient.FETCH_TABLETS_PER_RANGE_LOOKUP = saveFetchTablets;
}
}

/**
* Tests scan token creation and execution on a table with non-covering range partitions.
*/
@Test
public void testScanTokensNonCoveringRangePartitions() throws Exception {
Schema schema = createManyStringsSchema();
CreateTableOptions createOptions = new CreateTableOptions();
createOptions.addHashPartitions(ImmutableList.of("key"), 2);

PartialRow lower = schema.newPartialRow();
PartialRow upper = schema.newPartialRow();
lower.addString("key", "a");
upper.addString("key", "f");
createOptions.addRangePartition(lower, upper);

lower = schema.newPartialRow();
upper = schema.newPartialRow();
lower.addString("key", "h");
upper.addString("key", "z");
createOptions.addRangePartition(lower, upper);

PartialRow split = schema.newPartialRow();
split.addString("key", "k");
createOptions.addSplitRow(split);

syncClient.createTable(tableName, schema, createOptions);

KuduSession session = syncClient.newSession();
session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
KuduTable table = syncClient.openTable(tableName);
for (char c = 'a'; c < 'f'; c++) {
Insert insert = table.newInsert();
PartialRow row = insert.getRow();
row.addString("key", "" + c);
row.addString("c1", "c1_" + c);
row.addString("c2", "c2_" + c);
session.apply(insert);
}
for (char c = 'h'; c < 'z'; c++) {
Insert insert = table.newInsert();
PartialRow row = insert.getRow();
row.addString("key", "" + c);
row.addString("c1", "c1_" + c);
row.addString("c2", "c2_" + c);
session.apply(insert);
}
session.flush();

KuduScanToken.KuduScanTokenBuilder tokenBuilder = syncClient.newScanTokenBuilder(table);
tokenBuilder.setProjectedColumnIndexes(ImmutableList.<Integer>of());
List<KuduScanToken> tokens = tokenBuilder.build();
assertEquals(6, tokens.size());
assertEquals('f' - 'a' + 'z' - 'h', countScanTokenRows(tokens));

for (KuduScanToken token : tokens) {
// Sanity check to make sure the debug printing does not throw.
LOG.debug(KuduScanToken.stringifySerializedToken(token.serialize(), syncClient));
}
}

/**
* Tests the results of creating scan tokens, altering the columns being
* scanned, and then executing the scan tokens.
*/
@Test
public void testScanTokensConcurrentAlterTable() throws Exception {
Schema schema = new Schema(ImmutableList.of(
new ColumnSchema.ColumnSchemaBuilder("key", Type.INT64).nullable(false).key(true).build(),
new ColumnSchema.ColumnSchemaBuilder("a", Type.INT64).nullable(false).key(false).build()
));
CreateTableOptions createOptions = new CreateTableOptions();
createOptions.setRangePartitionColumns(ImmutableList.<String>of());
createOptions.setNumReplicas(1);
syncClient.createTable(tableName, schema, createOptions);

KuduTable table = syncClient.openTable(tableName);

KuduScanToken.KuduScanTokenBuilder tokenBuilder = syncClient.newScanTokenBuilder(table);
List<KuduScanToken> tokens = tokenBuilder.build();
assertEquals(1, tokens.size());
KuduScanToken token = tokens.get(0);

// Drop a column
syncClient.alterTable(tableName, new AlterTableOptions().dropColumn("a"));
try {
token.intoScanner(syncClient);
fail();
} catch (IllegalArgumentException e) {
assertTrue(e.getMessage().contains("Unknown column"));
}

// Add back the column with the wrong type.
syncClient.alterTable(
tableName,
new AlterTableOptions().addColumn(
new ColumnSchema.ColumnSchemaBuilder("a", Type.STRING).nullable(true).build()));
try {
token.intoScanner(syncClient);
fail();
} catch (IllegalStateException e) {
assertTrue(e.getMessage().contains(
"invalid type INT64 for column 'a' in scan token, expected: STRING"));
}

// Add the column with the wrong nullability.
syncClient.alterTable(
tableName,
new AlterTableOptions().dropColumn("a")
.addColumn(new ColumnSchema.ColumnSchemaBuilder("a", Type.INT64)
.nullable(true).build()));
try {
token.intoScanner(syncClient);
fail();
} catch (IllegalStateException e) {
assertTrue(e.getMessage().contains(
"invalid nullability for column 'a' in scan token, expected: NOT NULL"));
}

// Add the column with the correct type and nullability.
syncClient.alterTable(
tableName,
new AlterTableOptions().dropColumn("a")
.addColumn(new ColumnSchema.ColumnSchemaBuilder("a", Type.INT64)
.nullable(false)
.defaultValue(0L).build()));
token.intoScanner(syncClient);
}

/**
* Counts the rows in a table between two optional bounds.
Expand Down
Loading

0 comments on commit 0cda8c8

Please sign in to comment.