Skip to content

Commit

Permalink
KUDU-818(4/many): Add partition key range scan options
Browse files Browse the repository at this point in the history
Exposing partition range options for scanners is necessary for clients who want
to chop up a table into seperate scan ranges by locality (i.e. MapReduce and
Impala). This commit adds these options in a very raw way that we will not want
to support going forward, but is the most expedient way forward at this point.
These APIs are marked for internal use only.

Eventually we will provide a higher-level abstraction over locality-preserving
scan ranges, with built-in splitting for enabling applications that need scans
with finer-granularity.

Change-Id: I80cecb4870840d1f74b65cabe66936af2dfc15b5
Reviewed-on: http://gerrit.sjc.cloudera.com:8080/8056
Tested-by: jenkins
Reviewed-by: Todd Lipcon <[email protected]>
  • Loading branch information
danburkert authored and toddlipcon committed Sep 17, 2015
1 parent 50f157d commit e4e79ce
Show file tree
Hide file tree
Showing 16 changed files with 428 additions and 74 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,6 @@ www/tracing.*
python/kudu/*.so
python/kudu/*.cpp
*.py[ocd]

# JavaDoc
apidocs/
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ public abstract class AbstractKuduScannerBuilder
boolean prefetching = false;
boolean cacheBlocks = true;
long htTimestamp = AsyncKuduClient.NO_TIMESTAMP;
byte[] lowerBound = AsyncKuduClient.EMPTY_ARRAY;
byte[] upperBound = AsyncKuduClient.EMPTY_ARRAY;
byte[] lowerBoundPrimaryKey = AsyncKuduClient.EMPTY_ARRAY;
byte[] upperBoundPrimaryKey = AsyncKuduClient.EMPTY_ARRAY;
byte[] lowerBoundPartitionKey = AsyncKuduClient.EMPTY_ARRAY;
byte[] upperBoundPartitionKey = AsyncKuduClient.EMPTY_ARRAY;
List<String> projectedColumnNames = null;
long scanRequestTimeout;

Expand Down Expand Up @@ -157,7 +159,7 @@ public S scanRequestTimeout(long scanRequestTimeout) {
}

/**
* Add a lower bound (inclusive) for the scan.
* Add a lower bound (inclusive) primary key for the scan.
* If any bound is already added, this bound is intersected with that one.
* @param partialRow a partial row with specified key columns
* @return this instance
Expand All @@ -167,20 +169,22 @@ public S lowerBound(PartialRow partialRow) {
}

/**
* Like lowerBound() but the encoded key is an opaque byte array obtained elsewhere.
* @param encodedStartKey bytes containing an encoded start key
* Like lowerBoundPrimaryKey() but the encoded primary key is an opaque byte array obtained elsewhere.
* @param startPrimaryKey bytes containing an encoded start key
* @return this instance
* @deprecated use {@link #lowerBound(PartialRow)}
*/
public S lowerBoundRaw(byte[] encodedStartKey) {
if (lowerBound == AsyncKuduClient.EMPTY_ARRAY ||
Bytes.memcmp(encodedStartKey, lowerBound) > 0) {
this.lowerBound = encodedStartKey;
@Deprecated
public S lowerBoundRaw(byte[] startPrimaryKey) {
if (lowerBoundPrimaryKey == AsyncKuduClient.EMPTY_ARRAY ||
Bytes.memcmp(startPrimaryKey, lowerBoundPrimaryKey) > 0) {
this.lowerBoundPrimaryKey = startPrimaryKey;
}
return (S) this;
}

/**
* Add an upper bound (exclusive) for the scan.
* Add an upper bound (exclusive) primary key for the scan.
* If any bound is already added, this bound is intersected with that one.
* @param partialRow a partial row with specified key columns
* @return this instance
Expand All @@ -190,18 +194,47 @@ public S exclusiveUpperBound(PartialRow partialRow) {
}

/**
* Like exclusiveUpperBound() but the encoded key is an opaque byte array obtained elsewhere.
* @param encodedEndKey bytes containing an encoded end key
* Like exclusiveUpperBound() but the encoded primary key is an opaque byte array obtained elsewhere.
* @param endPrimaryKey bytes containing an encoded end key
* @return this instance
* @deprecated use {@link #exclusiveUpperBound(PartialRow)}
*/
public S exclusiveUpperBoundRaw(byte[] encodedEndKey) {
if (upperBound == AsyncKuduClient.EMPTY_ARRAY ||
Bytes.memcmp(encodedEndKey, upperBound) < 0) {
this.upperBound = encodedEndKey;
@Deprecated
public S exclusiveUpperBoundRaw(byte[] endPrimaryKey) {
if (upperBoundPrimaryKey == AsyncKuduClient.EMPTY_ARRAY ||
Bytes.memcmp(endPrimaryKey, upperBoundPrimaryKey) < 0) {
this.upperBoundPrimaryKey = endPrimaryKey;
}
return (S) this;
}

/**
* Set an encoded (inclusive) start partition key for the scan.
*
* @param partitionKey the encoded partition key
* @return this instance
*/
@InterfaceAudience.LimitedPrivate("Impala")
public S lowerBoundPartitionKeyRaw(byte[] partitionKey) {
if (Bytes.memcmp(partitionKey, lowerBoundPartitionKey) > 0) {
this.lowerBoundPartitionKey = partitionKey;
}
return (S) this;
}

/**
* Set an encoded (exclusive) end partition key for the scan.
*
* @param partitionKey the encoded partition key
* @return this instance
*/
@InterfaceAudience.LimitedPrivate("Impala")
public S exclusiveUpperBoundPartitionKeyRaw(byte[] partitionKey) {
if (upperBoundPartitionKey.length == 0 || Bytes.memcmp(partitionKey, upperBoundPartitionKey) < 0) {
this.upperBoundPartitionKey = partitionKey;
}
return (S) this;
}

public abstract T build();
}

Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,17 @@ public Common.ReadMode pbVersion() {
private final long limit;

/**
* The partition key of the next tablet to scan.
* The start partition key of the next tablet to scan.
*
* Each time the scan exhausts a tablet, this is updated to that tablet's end partition key.
*/
private byte[] nextPartitionKey;

/**
* The end partition key of the last tablet to scan.
*/
private final byte[] endPartitionKey;

/**
* Set in the builder. If it's not set by the user, it will default to EMPTY_ARRAY.
* It is then reset to the new start primary key of each tablet we open a scanner on as the scan
Expand Down Expand Up @@ -193,6 +198,7 @@ public Common.ReadMode pbVersion() {
List<Tserver.ColumnRangePredicatePB> columnRangePredicates, long limit,
boolean cacheBlocks, boolean prefetching,
byte[] startPrimaryKey, byte[] endPrimaryKey,
byte[] startPartitionKey, byte[] endPartitionKey,
long htTimestamp, int maxNumBytes) {
Preconditions.checkArgument(maxNumBytes > 0, "Need a strictly positive number of bytes, " +
"got %s", maxNumBytes);
Expand All @@ -217,11 +223,35 @@ public Common.ReadMode pbVersion() {
this.maxNumBytes = maxNumBytes;

if (table.getPartitionSchema().isSimpleRangePartitioning()) {
// For tables with default partitioning, we can optimize the tablet lookup
// to skip tablets with partition keys below the start primary key.
this.nextPartitionKey = this.startPrimaryKey;
// If the table is simple range partitioned, then the partition key space
// is isomorphic to the primary key space. We can potentially reduce the
// scan length by only scanning the intersection of the primary key range
// and the partition key range. This is a stop-gap until real partition
// pruning is in place that can work across any partitioning type.

if ((endPartitionKey.length != 0 && Bytes.memcmp(startPrimaryKey, endPartitionKey) >= 0) ||
(endPrimaryKey.length != 0 && Bytes.memcmp(startPartitionKey, endPrimaryKey) >= 0)) {
// The primary key range and the partition key range do not intersect;
// the scan will be empty.
this.nextPartitionKey = startPartitionKey;
this.endPartitionKey = endPartitionKey;
} else {
// Assign the scan's partition key range to the intersection of the
// primary key and partition key ranges.
if (Bytes.memcmp(startPartitionKey, startPrimaryKey) < 0) {
this.nextPartitionKey = startPrimaryKey;
} else {
this.nextPartitionKey = startPartitionKey;
}
if (endPrimaryKey.length != 0 && Bytes.memcmp(endPartitionKey, endPrimaryKey) > 0) {
this.endPartitionKey = endPrimaryKey;
} else {
this.endPartitionKey = endPartitionKey;
}
}
} else {
this.nextPartitionKey = AsyncKuduClient.EMPTY_ARRAY;
this.nextPartitionKey = startPartitionKey;
this.endPartitionKey = endPartitionKey;
}

// Map the column names to actual columns in the table schema.
Expand All @@ -241,7 +271,6 @@ public Common.ReadMode pbVersion() {
}
}


/**
* Returns the maximum number of rows that this scanner was configured to return.
* @return a long representing the maximum number of rows that can be returned
Expand Down Expand Up @@ -387,14 +416,11 @@ public String toString() {
}

void scanFinished() {
// We're done if
// 1) the last tablet has been scanned, or
// 2) the table is using default partitioning and we're past the end primary key.
Partition partition = tablet.getPartition();
// Stop scanning if we have scanned until or past the end partition key.
if (partition.isEndPartition()
|| (this.table.getPartitionSchema().isSimpleRangePartitioning()
&& this.endPrimaryKey != AsyncKuduClient.EMPTY_ARRAY
&& Bytes.memcmp(this.endPrimaryKey, partition.getPartitionKeyEnd()) <= 0)) {
|| (this.endPartitionKey != AsyncKuduClient.EMPTY_ARRAY
&& Bytes.memcmp(this.endPartitionKey, partition.getPartitionKeyEnd()) <= 0)) {
hasMore = false;
closed = true; // the scanner is closed on the other side at this point
return;
Expand Down Expand Up @@ -711,7 +737,9 @@ public AsyncKuduScanner build() {
return new AsyncKuduScanner(
client, table, projectedColumnNames, readMode,
scanRequestTimeout, columnRangePredicates, limit, cacheBlocks,
prefetching, lowerBound, upperBound, htTimestamp, maxNumBytes);
prefetching, lowerBoundPrimaryKey, upperBoundPrimaryKey,
lowerBoundPartitionKey, upperBoundPartitionKey,
htTimestamp, maxNumBytes);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ public KuduScanner build() {
return new KuduScanner(new AsyncKuduScanner(
client, table, projectedColumnNames, readMode,
scanRequestTimeout, columnRangePredicates, limit, cacheBlocks,
prefetching, lowerBound, upperBound,
prefetching, lowerBoundPrimaryKey, upperBoundPrimaryKey,
lowerBoundPartitionKey, upperBoundPartitionKey,
htTimestamp, maxNumBytes));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
* In addition to the start and end partition keys, a Partition holds metadata
* to determine if a scan can prune, or skip, a partition based on the scan's
* start and end primary keys, and predicates.
*
* This class is new, and not considered stable or suitable for public use.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
@InterfaceAudience.LimitedPrivate("Impala")
@InterfaceStability.Unstable
public class Partition implements Comparable<Partition> {
final byte[] partitionKeyStart;
final byte[] partitionKeyEnd;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
* Each hash bucket component includes one or more columns from the primary key
* column set, with the restriction that an individual primary key column may
* only be included in a single hash component.
*
* This class is new, and not considered stable or suitable for public use.
*/
@InterfaceAudience.Public
@InterfaceAudience.LimitedPrivate("Impala")
@InterfaceStability.Unstable
class PartitionSchema {
public class PartitionSchema {

private final RangeSchema rangeSchema;
private final List<HashBucketSchema> hashBucketSchemas;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;

Expand Down Expand Up @@ -81,7 +82,6 @@ private void testPartitionSchema(CreateTableBuilder tableBuilder) throws Excepti
syncClient.createTable(tableName, schema, tableBuilder);

KuduTable table = syncClient.openTable(tableName);
assertFalse(table.getPartitionSchema().isSimpleRangePartitioning());

Set<Row> rows = rows();
insertRows(table, rows);
Expand Down Expand Up @@ -135,6 +135,52 @@ private void testPartitionSchema(CreateTableBuilder tableBuilder) throws Excepti

assertEquals(expected, results);
}

List<LocatedTablet> tablets = table.getTabletsLocations(TestTimeouts.DEFAULT_SLEEP);

{ // Per-tablet scan
Set<Row> results = new HashSet<>();

for (LocatedTablet tablet : tablets) {
KuduScanner scanner = syncClient.newScannerBuilder(table)
.lowerBoundPartitionKeyRaw(tablet.getPartition().getPartitionKeyStart())
.exclusiveUpperBoundPartitionKeyRaw(tablet.getPartition().getPartitionKeyEnd())
.build();
Set<Row> tabletResults = collectRows(scanner);
Set<Row> intersection = Sets.intersection(results, tabletResults);
assertEquals(new HashSet<>(), intersection);
results.addAll(tabletResults);
}

assertEquals(rows, results);
}

{ // Per-tablet scan with lower & upper bounds
Row minRow = new Row("1", "3", "5");
Row maxRow = new Row("2", "4", "");
PartialRow lowerBound = schema.newPartialRow();
minRow.fillPartialRow(lowerBound);
PartialRow upperBound = schema.newPartialRow();
maxRow.fillPartialRow(upperBound);

Set<Row> expected = Sets.filter(rows, Predicates.and(minRow.gtePred(), maxRow.ltPred()));
Set<Row> results = new HashSet<>();

for (LocatedTablet tablet : tablets) {
KuduScanner scanner = syncClient.newScannerBuilder(table)
.lowerBound(lowerBound)
.exclusiveUpperBound(upperBound)
.lowerBoundPartitionKeyRaw(tablet.getPartition().getPartitionKeyStart())
.exclusiveUpperBoundPartitionKeyRaw(tablet.getPartition().getPartitionKeyEnd())
.build();
Set<Row> tabletResults = collectRows(scanner);
Set<Row> intersection = Sets.intersection(results, tabletResults);
assertEquals(new HashSet<>(), intersection);
results.addAll(tabletResults);
}

assertEquals(expected, results);
}
}

@Test
Expand Down Expand Up @@ -182,6 +228,21 @@ public void testHashBucketedAndRangePartitionedTable() throws Exception {
testPartitionSchema(tableBuilder);
}

@Test
public void testSimplePartitionedTable() throws Exception {
Schema schema = createSchema();
CreateTableBuilder tableBuilder = new CreateTableBuilder();

PartialRow split = schema.newPartialRow();
split.addString("c", "3");
tableBuilder.addSplitRow(split);
split.addString("c", "3");
split.addString("b", "3");
tableBuilder.addSplitRow(split);

testPartitionSchema(tableBuilder);
}

public static class Row implements Comparable<Row> {
private final String a;
private final String b;
Expand Down
Loading

0 comments on commit e4e79ce

Please sign in to comment.