diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableOptions.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableOptions.java index abf5538ced..cb92c29fb5 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableOptions.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableOptions.java @@ -297,7 +297,8 @@ public AlterTableOptions addRangePartition(PartialRow lowerBound, step.setAddRangePartition(builder); if (!pb.hasSchema()) { pb.setSchema(ProtobufHelper.schemaToPb(lowerBound.getSchema(), - EnumSet.of(SchemaPBConversionFlags.SCHEMA_PB_WITHOUT_COMMENT))); + EnumSet.of(SchemaPBConversionFlags.SCHEMA_PB_WITHOUT_COMMENT, + SchemaPBConversionFlags.SCHEMA_PB_WITHOUT_ID))); } return this; } @@ -357,7 +358,8 @@ public AlterTableOptions dropRangePartition(PartialRow lowerBound, step.setDropRangePartition(builder); if (!pb.hasSchema()) { pb.setSchema(ProtobufHelper.schemaToPb(lowerBound.getSchema(), - EnumSet.of(SchemaPBConversionFlags.SCHEMA_PB_WITHOUT_COMMENT))); + EnumSet.of(SchemaPBConversionFlags.SCHEMA_PB_WITHOUT_COMMENT, + SchemaPBConversionFlags.SCHEMA_PB_WITHOUT_ID))); } return this; } diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/CreateTableRequest.java b/java/kudu-client/src/main/java/org/apache/kudu/client/CreateTableRequest.java index c270f82cc8..f5c5eb5d56 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/CreateTableRequest.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/CreateTableRequest.java @@ -18,6 +18,7 @@ package org.apache.kudu.client; import java.util.Collection; +import java.util.EnumSet; import java.util.List; import com.google.protobuf.Message; @@ -25,6 +26,7 @@ import org.jboss.netty.util.Timer; import org.apache.kudu.Schema; +import org.apache.kudu.client.ProtobufHelper.SchemaPBConversionFlags; import org.apache.kudu.master.Master; import org.apache.kudu.util.Pair; @@ -57,7 +59,9 @@ class CreateTableRequest extends KuduRpc { @Override Message createRequestPB() { this.builder.setName(this.name); - this.builder.setSchema(ProtobufHelper.schemaToPb(this.schema)); + this.builder.setSchema( + ProtobufHelper.schemaToPb(this.schema, + EnumSet.of(SchemaPBConversionFlags.SCHEMA_PB_WITHOUT_ID))); return this.builder.build(); } diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java index adabf261b8..bfb071205b 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java @@ -19,11 +19,11 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.EnumSet; import java.util.List; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; import com.google.protobuf.CodedInputStream; import com.google.protobuf.CodedOutputStream; import com.google.protobuf.UnsafeByteOperations; @@ -32,7 +32,9 @@ import org.apache.kudu.ColumnSchema; import org.apache.kudu.Common; +import org.apache.kudu.Schema; import org.apache.kudu.client.Client.ScanTokenPB; +import org.apache.kudu.client.ProtobufHelper.SchemaPBConversionFlags; import org.apache.kudu.util.Pair; /** @@ -159,6 +161,29 @@ public static String stringifySerializedToken(byte[] buf, KuduClient client) thr return helper.toString(); } + private static List computeProjectedColumnIndexesForScanner(ScanTokenPB message, + Schema schema) { + List columns = new ArrayList<>(message.getProjectedColumnsCount()); + for (Common.ColumnSchemaPB colSchemaFromPb : message.getProjectedColumnsList()) { + int colIdx = colSchemaFromPb.hasId() && schema.hasColumnIds() ? + schema.getColumnIndex(colSchemaFromPb.getId()) : + schema.getColumnIndex(colSchemaFromPb.getName()); + ColumnSchema colSchema = schema.getColumnByIndex(colIdx); + if (colSchemaFromPb.getType() != colSchema.getType().getDataType(colSchema.getTypeAttributes())) { + throw new IllegalStateException(String.format( + "invalid type %s for column '%s' in scan token, expected: %s", + colSchemaFromPb.getType().name(), colSchemaFromPb.getName(), colSchema.getType().name())); + } + if (colSchemaFromPb.getIsNullable() != colSchema.isNullable()) { + throw new IllegalStateException(String.format( + "invalid nullability for column '%s' in scan token, expected: %s", + colSchemaFromPb.getName(), colSchema.isNullable() ? "NULLABLE" : "NOT NULL")); + } + columns.add(colIdx); + } + return columns; + } + private static KuduScanner pbIntoScanner(ScanTokenPB message, KuduClient client) throws KuduException { Preconditions.checkArgument( @@ -169,25 +194,9 @@ private static KuduScanner pbIntoScanner(ScanTokenPB message, client.openTable(message.getTableName()); KuduScanner.KuduScannerBuilder builder = client.newScannerBuilder(table); - List columns = new ArrayList<>(message.getProjectedColumnsCount()); - for (Common.ColumnSchemaPB column : message.getProjectedColumnsList()) { - int columnIdx = table.getSchema().getColumnIndex(column.getName()); - ColumnSchema schema = table.getSchema().getColumnByIndex(columnIdx); - if (column.getType() != schema.getType().getDataType(schema.getTypeAttributes())) { - throw new IllegalStateException(String.format( - "invalid type %s for column '%s' in scan token, expected: %s", - column.getType().name(), column.getName(), schema.getType().name())); - } - if (column.getIsNullable() != schema.isNullable()) { - throw new IllegalStateException(String.format( - "invalid nullability for column '%s' in scan token, expected: %s", - column.getName(), column.getIsNullable() ? "NULLABLE" : "NOT NULL")); - - } - columns.add(columnIdx); - } - builder.setProjectedColumnIndexes(columns); + builder.setProjectedColumnIndexes( + computeProjectedColumnIndexesForScanner(message, table.getSchema())); for (Common.ColumnPredicatePB pred : message.getColumnPredicatesList()) { builder.addPredicate(KuduPredicate.fromPB(table.getSchema(), pred)); @@ -355,21 +364,32 @@ public List build() { // Map the column names or indices to actual columns in the table schema. // If the user did not set either projection, then scan all columns. + Schema schema = table.getSchema(); if (projectedColumnNames != null) { for (String columnName : projectedColumnNames) { - ColumnSchema columnSchema = table.getSchema().getColumn(columnName); + ColumnSchema columnSchema = schema.getColumn(columnName); Preconditions.checkArgument(columnSchema != null, "unknown column i%s", columnName); - ProtobufHelper.columnToPb(proto.addProjectedColumnsBuilder(), columnSchema); + ProtobufHelper.columnToPb(proto.addProjectedColumnsBuilder(), + schema.hasColumnIds() ? schema.getColumnId(columnName) : -1, + columnSchema); } } else if (projectedColumnIndexes != null) { for (int columnIdx : projectedColumnIndexes) { - ColumnSchema columnSchema = table.getSchema().getColumnByIndex(columnIdx); + ColumnSchema columnSchema = schema.getColumnByIndex(columnIdx); Preconditions.checkArgument(columnSchema != null, "unknown column index %s", columnIdx); - ProtobufHelper.columnToPb(proto.addProjectedColumnsBuilder(), columnSchema); + ProtobufHelper.columnToPb(proto.addProjectedColumnsBuilder(), + schema.hasColumnIds() ? + schema.getColumnId(columnSchema.getName()) : + -1, + columnSchema); } } else { - for (ColumnSchema column : table.getSchema().getColumns()) { - ProtobufHelper.columnToPb(proto.addProjectedColumnsBuilder(), column); + for (ColumnSchema column : schema.getColumns()) { + ProtobufHelper.columnToPb(proto.addProjectedColumnsBuilder(), + schema.hasColumnIds() ? + schema.getColumnId(column.getName()) : + -1, + column); } } diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java index bb655f03b5..998692b7db 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java @@ -290,7 +290,8 @@ static Tserver.WriteRequestPB.Builder createAndFillWriteRequestPB(List schemaToListPb(Schema schema) { return schemaToListPb(schema, EnumSet.noneOf(SchemaPBConversionFlags.class)); } - public static List schemaToListPb( - Schema schema, EnumSet flags) { - ArrayList columns = - new ArrayList(schema.getColumnCount()); + public static List schemaToListPb(Schema schema, + EnumSet flags) { + ArrayList columns = new ArrayList<>(schema.getColumnCount()); Common.ColumnSchemaPB.Builder schemaBuilder = Common.ColumnSchemaPB.newBuilder(); for (ColumnSchema col : schema.getColumns()) { - columns.add(columnToPb(schemaBuilder, col, flags)); + int id = schema.hasColumnIds() ? schema.getColumnId(col.getName()) : -1; + columns.add(columnToPb(schemaBuilder, id, col, flags)); schemaBuilder.clear(); } return columns; @@ -74,24 +75,30 @@ public static Common.SchemaPB schemaToPb(Schema schema) { return schemaToPb(schema, EnumSet.noneOf(SchemaPBConversionFlags.class)); } - public static Common.SchemaPB schemaToPb( - Schema schema, EnumSet flags) { + public static Common.SchemaPB schemaToPb(Schema schema, + EnumSet flags) { Common.SchemaPB.Builder builder = Common.SchemaPB.newBuilder(); builder.addAllColumns(schemaToListPb(schema, flags)); return builder.build(); } public static Common.ColumnSchemaPB columnToPb(ColumnSchema column) { - return columnToPb(Common.ColumnSchemaPB.newBuilder(), column); + return columnToPb(Common.ColumnSchemaPB.newBuilder(), -1, column); } public static Common.ColumnSchemaPB columnToPb(Common.ColumnSchemaPB.Builder schemaBuilder, + int colId, ColumnSchema column) { - return columnToPb(schemaBuilder, column, EnumSet.noneOf(SchemaPBConversionFlags.class)); + return columnToPb(schemaBuilder, + colId, + column, + EnumSet.noneOf(SchemaPBConversionFlags.class)); } public static Common.ColumnSchemaPB columnToPb(Common.ColumnSchemaPB.Builder schemaBuilder, - ColumnSchema column, EnumSet flags) { + int colId, + ColumnSchema column, + EnumSet flags) { schemaBuilder .setName(column.getName()) .setType(column.getWireType()) @@ -99,6 +106,9 @@ public static Common.ColumnSchemaPB columnToPb(Common.ColumnSchemaPB.Builder sch .setIsNullable(column.isNullable()) .setCfileBlockSize(column.getDesiredBlockSize()); + if (!flags.contains(SchemaPBConversionFlags.SCHEMA_PB_WITHOUT_ID) && colId >= 0) { + schemaBuilder.setId(colId); + } if (column.getEncoding() != null) { schemaBuilder.setEncoding(column.getEncoding().getInternalPbType()); } diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKeyEncoding.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKeyEncoding.java index 4caf6ddbbd..2b80d4a8bf 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKeyEncoding.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKeyEncoding.java @@ -22,6 +22,7 @@ import java.math.BigDecimal; import java.util.ArrayList; +import java.util.EnumSet; import java.util.List; import com.google.common.collect.ImmutableList; @@ -37,6 +38,7 @@ import org.apache.kudu.Type; import org.apache.kudu.client.PartitionSchema.HashBucketSchema; import org.apache.kudu.client.PartitionSchema.RangeSchema; +import org.apache.kudu.client.ProtobufHelper.SchemaPBConversionFlags; import org.apache.kudu.util.DecimalUtil; public class TestKeyEncoding { @@ -55,9 +57,10 @@ private static Schema buildSchema(ColumnSchemaBuilder... columns) { int i = 0; Common.SchemaPB.Builder pb = Common.SchemaPB.newBuilder(); for (ColumnSchemaBuilder column : columns) { - Common.ColumnSchemaPB.Builder columnPb = - ProtobufHelper.columnToPb(column.build()).toBuilder(); - columnPb.setId(i++); + Common.ColumnSchemaPB columnPb = + ProtobufHelper.columnToPb(Common.ColumnSchemaPB.newBuilder(), + i++, + column.build()); pb.addColumns(columnPb); } return ProtobufHelper.pbToSchema(pb.build()); diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanToken.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanToken.java index 1d2da18eec..c203e9536e 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanToken.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanToken.java @@ -215,41 +215,64 @@ public void testScanTokensConcurrentAlterTable() throws Exception { assertTrue(e.getMessage().contains("Unknown column")); } - // Add back the column with the wrong type. + // Add a column with the same name, type, and nullability. It will have a different id-- it's a + // different column-- so the scan token will fail. client.alterTable( testTableName, - new AlterTableOptions().addColumn( - new ColumnSchema.ColumnSchemaBuilder("a", Type.STRING).nullable(true).build())); + new AlterTableOptions() + .addColumn(new ColumnSchema.ColumnSchemaBuilder("a", Type.INT64) + .nullable(false) + .defaultValue(0L).build())); try { token.intoScanner(client); fail(); - } catch (IllegalStateException e) { + } catch (IllegalArgumentException e) { assertTrue(e.getMessage().contains( - "invalid type INT64 for column 'a' in scan token, expected: STRING")); + "Unknown column")); } + } - // Add the column with the wrong nullability. - client.alterTable( - testTableName, - new AlterTableOptions().dropColumn("a") - .addColumn(new ColumnSchema.ColumnSchemaBuilder("a", Type.INT64) - .nullable(true).build())); + /** + * Tests that it is possible to create a scan token, rename a column, and rehydrate a scanner from + * the scan token with the old column name. + */ + @Test + public void testScanTokensConcurrentColumnRename() throws Exception { + Schema schema = getBasicSchema(); + String oldColName = schema.getColumnByIndex(1).getName(); + CreateTableOptions createOptions = new CreateTableOptions(); + createOptions.setRangePartitionColumns(ImmutableList.of()); + createOptions.setNumReplicas(1); + client.createTable(testTableName, schema, createOptions); + + KuduTable table = client.openTable(testTableName); + + KuduScanToken.KuduScanTokenBuilder tokenBuilder = client.newScanTokenBuilder(table); + List tokens = tokenBuilder.build(); + assertEquals(1, tokens.size()); + KuduScanToken token = tokens.get(0); + + // Rename a column. + String newColName = "new-name"; + client.alterTable(testTableName, new AlterTableOptions().renameColumn(oldColName, newColName)); + + KuduScanner scanner = token.intoScanner(client); + + // TODO(wdberkeley): Handle renaming a column between when the token is rehydrated as a scanner + // and when the scanner first hits a replica. Note that this is almost certainly a very + // short period of vulnerability. + + assertEquals(0, countRowsInScan(scanner)); + + // Test that the old name cannot be used and the new name can be. + Schema alteredSchema = scanner.getProjectionSchema(); try { - token.intoScanner(client); + alteredSchema.getColumn(oldColName); fail(); - } catch (IllegalStateException e) { - assertTrue(e.getMessage().contains( - "invalid nullability for column 'a' in scan token, expected: NOT NULL")); + } catch (IllegalArgumentException ex) { + // Good. } - - // Add the column with the correct type and nullability. - client.alterTable( - testTableName, - new AlterTableOptions().dropColumn("a") - .addColumn(new ColumnSchema.ColumnSchemaBuilder("a", Type.INT64) - .nullable(false) - .defaultValue(0L).build())); - token.intoScanner(client); + alteredSchema.getColumn(newColName); } /** @@ -357,4 +380,103 @@ public void testScanRequestTimeout() throws IOException { assertEquals(SCAN_REQUEST_TIMEOUT_MS, scanner.getScanRequestTimeout()); } } + + // Helper for scan token tests that use diff scan. + private long setupTableForDiffScans(KuduClient client, + KuduTable table, + int numRows) throws Exception { + KuduSession session = client.newSession(); + for (int i = 0 ; i < numRows / 2; i++) { + session.apply(createBasicSchemaInsert(table, i)); + } + + // Grab the timestamp, then add more data so there's a diff. + long timestamp = client.getLastPropagatedTimestamp(); + for (int i = numRows / 2; i < numRows; i++) { + session.apply(createBasicSchemaInsert(table, i)); + } + // Delete some data so the is_deleted column can be tested. + for (int i = 0; i < numRows / 4; i++) { + Delete delete = table.newDelete(); + PartialRow row = delete.getRow(); + row.addInt(0, i); + session.apply(delete); + } + + return timestamp; + } + + // Helper to check diff scan results. + private void checkDiffScanResults(KuduScanner scanner, + int numExpectedMutations, + int numExpectedDeletes) throws KuduException { + int numMutations = 0; + int numDeletes = 0; + while (scanner.hasMoreRows()) { + for (RowResult rowResult : scanner.nextRows()) { + numMutations++; + if (rowResult.isDeleted()) numDeletes++; + } + } + assertEquals(numExpectedMutations, numMutations); + assertEquals(numExpectedDeletes, numDeletes); + } + + /** Test that scan tokens work with diff scans. */ + @Test + public void testDiffScanTokens() throws Exception { + Schema schema = getBasicSchema(); + CreateTableOptions createOptions = new CreateTableOptions(); + createOptions.setRangePartitionColumns(ImmutableList.of()); + createOptions.setNumReplicas(1); + KuduTable table = client.createTable(testTableName, schema, createOptions); + + // Set up the table for a diff scan. + int numRows = 20; + long timestamp = setupTableForDiffScans(client, table, numRows); + + // Since the diff scan interval is [start, end), increment the start timestamp to exclude + // the last row inserted in the first group of ops, and increment the end timestamp to include + // the last row deleted in the second group of ops. + List tokens = client.newScanTokenBuilder(table) + .diffScan(timestamp + 1, client.getLastPropagatedTimestamp() + 1) + .build(); + assertEquals(1, tokens.size()); + + checkDiffScanResults(tokens.get(0).intoScanner(client), 3 * numRows / 4, numRows / 4); + } + + /** Test that scan tokens work with diff scans even when columns are renamed. */ + @Test + public void testDiffScanTokensConcurrentColumnRename() throws Exception { + Schema schema = getBasicSchema(); + CreateTableOptions createOptions = new CreateTableOptions(); + createOptions.setRangePartitionColumns(ImmutableList.of()); + createOptions.setNumReplicas(1); + KuduTable table = client.createTable(testTableName, schema, createOptions); + + // Set up the table for a diff scan. + int numRows = 20; + long timestamp = setupTableForDiffScans(client, table, numRows); + + // Since the diff scan interval is [start, end), increment the start timestamp to exclude + // the last row inserted in the first group of ops, and increment the end timestamp to include + // the last row deleted in the second group of ops. + List tokens = client.newScanTokenBuilder(table) + .diffScan(timestamp + 1, client.getLastPropagatedTimestamp() + 1) + .build(); + assertEquals(1, tokens.size()); + + // Rename a column between when the token is created and when it is rehydrated into a scanner + client.alterTable(table.getName(), + new AlterTableOptions().renameColumn("column1_i", "column1_i_new")); + + KuduScanner scanner = tokens.get(0).intoScanner(client); + + // TODO(wdberkeley): Handle renaming a column between when the token is rehydrated as a scanner + // and when the scanner first hits a replica. Note that this is almost certainly a very + // short period of vulnerability. + + checkDiffScanResults(scanner, 3 * numRows / 4, numRows / 4); + } }