Skip to content

Commit

Permalink
[java] Favor column ids over column names in scan tokens
Browse files Browse the repository at this point in the history
Previously, a scan token would use column name to map a column in its
projection to a column in the target table's current schema.
Therefore, a scan token couldn't be used if a column were renamed
between when the token is cut and when it is rehydrated into a scanner.
This adjusts the Java client to prefer ids to names, to fix this
behavior. Since this involves including column ids when serializing
columns to PBs as part of scan tokens, but the server does not permit
clients to send column ids in most cases, this patch adds a new
serialization option that includes column ids.

Note that this patch does not make _scanners_ resistant to column name
changes. If a scanner is opened against a table and a column name
changes on a replica before the scanner opens a server-side scanner on
it, the scan will fail if the column is in the projection.

A follow-up will add similar capability to the C++ client.

Change-Id: Ib3f05a4175c7e7bfaec2cbd3586723e6de3823f0
Reviewed-on: http://gerrit.cloudera.org:8080/13562
Reviewed-by: Mike Percy <[email protected]>
Tested-by: Kudu Jenkins
  • Loading branch information
wdberkeley authored and mpercy committed Jun 14, 2019
1 parent a7bdd06 commit 0f2946b
Show file tree
Hide file tree
Showing 7 changed files with 229 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,8 @@ public AlterTableOptions addRangePartition(PartialRow lowerBound,
step.setAddRangePartition(builder);
if (!pb.hasSchema()) {
pb.setSchema(ProtobufHelper.schemaToPb(lowerBound.getSchema(),
EnumSet.of(SchemaPBConversionFlags.SCHEMA_PB_WITHOUT_COMMENT)));
EnumSet.of(SchemaPBConversionFlags.SCHEMA_PB_WITHOUT_COMMENT,
SchemaPBConversionFlags.SCHEMA_PB_WITHOUT_ID)));
}
return this;
}
Expand Down Expand Up @@ -357,7 +358,8 @@ public AlterTableOptions dropRangePartition(PartialRow lowerBound,
step.setDropRangePartition(builder);
if (!pb.hasSchema()) {
pb.setSchema(ProtobufHelper.schemaToPb(lowerBound.getSchema(),
EnumSet.of(SchemaPBConversionFlags.SCHEMA_PB_WITHOUT_COMMENT)));
EnumSet.of(SchemaPBConversionFlags.SCHEMA_PB_WITHOUT_COMMENT,
SchemaPBConversionFlags.SCHEMA_PB_WITHOUT_ID)));
}
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
package org.apache.kudu.client;

import java.util.Collection;
import java.util.EnumSet;
import java.util.List;

import com.google.protobuf.Message;
import org.apache.yetus.audience.InterfaceAudience;
import org.jboss.netty.util.Timer;

import org.apache.kudu.Schema;
import org.apache.kudu.client.ProtobufHelper.SchemaPBConversionFlags;
import org.apache.kudu.master.Master;
import org.apache.kudu.util.Pair;

Expand Down Expand Up @@ -57,7 +59,9 @@ class CreateTableRequest extends KuduRpc<CreateTableResponse> {
@Override
Message createRequestPB() {
this.builder.setName(this.name);
this.builder.setSchema(ProtobufHelper.schemaToPb(this.schema));
this.builder.setSchema(
ProtobufHelper.schemaToPb(this.schema,
EnumSet.of(SchemaPBConversionFlags.SCHEMA_PB_WITHOUT_ID)));
return this.builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;

import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.CodedInputStream;
import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.UnsafeByteOperations;
Expand All @@ -32,7 +32,9 @@

import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Common;
import org.apache.kudu.Schema;
import org.apache.kudu.client.Client.ScanTokenPB;
import org.apache.kudu.client.ProtobufHelper.SchemaPBConversionFlags;
import org.apache.kudu.util.Pair;

/**
Expand Down Expand Up @@ -159,6 +161,29 @@ public static String stringifySerializedToken(byte[] buf, KuduClient client) thr
return helper.toString();
}

private static List<Integer> computeProjectedColumnIndexesForScanner(ScanTokenPB message,
Schema schema) {
List<Integer> columns = new ArrayList<>(message.getProjectedColumnsCount());
for (Common.ColumnSchemaPB colSchemaFromPb : message.getProjectedColumnsList()) {
int colIdx = colSchemaFromPb.hasId() && schema.hasColumnIds() ?
schema.getColumnIndex(colSchemaFromPb.getId()) :
schema.getColumnIndex(colSchemaFromPb.getName());
ColumnSchema colSchema = schema.getColumnByIndex(colIdx);
if (colSchemaFromPb.getType() != colSchema.getType().getDataType(colSchema.getTypeAttributes())) {
throw new IllegalStateException(String.format(
"invalid type %s for column '%s' in scan token, expected: %s",
colSchemaFromPb.getType().name(), colSchemaFromPb.getName(), colSchema.getType().name()));
}
if (colSchemaFromPb.getIsNullable() != colSchema.isNullable()) {
throw new IllegalStateException(String.format(
"invalid nullability for column '%s' in scan token, expected: %s",
colSchemaFromPb.getName(), colSchema.isNullable() ? "NULLABLE" : "NOT NULL"));
}
columns.add(colIdx);
}
return columns;
}

private static KuduScanner pbIntoScanner(ScanTokenPB message,
KuduClient client) throws KuduException {
Preconditions.checkArgument(
Expand All @@ -169,25 +194,9 @@ private static KuduScanner pbIntoScanner(ScanTokenPB message,
client.openTable(message.getTableName());
KuduScanner.KuduScannerBuilder builder = client.newScannerBuilder(table);

List<Integer> columns = new ArrayList<>(message.getProjectedColumnsCount());
for (Common.ColumnSchemaPB column : message.getProjectedColumnsList()) {
int columnIdx = table.getSchema().getColumnIndex(column.getName());
ColumnSchema schema = table.getSchema().getColumnByIndex(columnIdx);
if (column.getType() != schema.getType().getDataType(schema.getTypeAttributes())) {
throw new IllegalStateException(String.format(
"invalid type %s for column '%s' in scan token, expected: %s",
column.getType().name(), column.getName(), schema.getType().name()));
}
if (column.getIsNullable() != schema.isNullable()) {
throw new IllegalStateException(String.format(
"invalid nullability for column '%s' in scan token, expected: %s",
column.getName(), column.getIsNullable() ? "NULLABLE" : "NOT NULL"));

}

columns.add(columnIdx);
}
builder.setProjectedColumnIndexes(columns);
builder.setProjectedColumnIndexes(
computeProjectedColumnIndexesForScanner(message, table.getSchema()));

for (Common.ColumnPredicatePB pred : message.getColumnPredicatesList()) {
builder.addPredicate(KuduPredicate.fromPB(table.getSchema(), pred));
Expand Down Expand Up @@ -355,21 +364,32 @@ public List<KuduScanToken> build() {

// Map the column names or indices to actual columns in the table schema.
// If the user did not set either projection, then scan all columns.
Schema schema = table.getSchema();
if (projectedColumnNames != null) {
for (String columnName : projectedColumnNames) {
ColumnSchema columnSchema = table.getSchema().getColumn(columnName);
ColumnSchema columnSchema = schema.getColumn(columnName);
Preconditions.checkArgument(columnSchema != null, "unknown column i%s", columnName);
ProtobufHelper.columnToPb(proto.addProjectedColumnsBuilder(), columnSchema);
ProtobufHelper.columnToPb(proto.addProjectedColumnsBuilder(),
schema.hasColumnIds() ? schema.getColumnId(columnName) : -1,
columnSchema);
}
} else if (projectedColumnIndexes != null) {
for (int columnIdx : projectedColumnIndexes) {
ColumnSchema columnSchema = table.getSchema().getColumnByIndex(columnIdx);
ColumnSchema columnSchema = schema.getColumnByIndex(columnIdx);
Preconditions.checkArgument(columnSchema != null, "unknown column index %s", columnIdx);
ProtobufHelper.columnToPb(proto.addProjectedColumnsBuilder(), columnSchema);
ProtobufHelper.columnToPb(proto.addProjectedColumnsBuilder(),
schema.hasColumnIds() ?
schema.getColumnId(columnSchema.getName()) :
-1,
columnSchema);
}
} else {
for (ColumnSchema column : table.getSchema().getColumns()) {
ProtobufHelper.columnToPb(proto.addProjectedColumnsBuilder(), column);
for (ColumnSchema column : schema.getColumns()) {
ProtobufHelper.columnToPb(proto.addProjectedColumnsBuilder(),
schema.hasColumnIds() ?
schema.getColumnId(column.getName()) :
-1,
column);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,8 @@ static Tserver.WriteRequestPB.Builder createAndFillWriteRequestPB(List<Operation

Tserver.WriteRequestPB.Builder requestBuilder = Tserver.WriteRequestPB.newBuilder();
requestBuilder.setSchema(ProtobufHelper.schemaToPb(schema,
EnumSet.of(SchemaPBConversionFlags.SCHEMA_PB_WITHOUT_COMMENT)));
EnumSet.of(SchemaPBConversionFlags.SCHEMA_PB_WITHOUT_COMMENT,
SchemaPBConversionFlags.SCHEMA_PB_WITHOUT_ID)));
requestBuilder.setRowOperations(rowOps);
return requestBuilder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ public class ProtobufHelper {
* The flags that are not included while serializing.
*/
public enum SchemaPBConversionFlags {
SCHEMA_PB_WITHOUT_COMMENT;
SCHEMA_PB_WITHOUT_COMMENT,
SCHEMA_PB_WITHOUT_ID
}

/**
Expand All @@ -58,13 +59,13 @@ public static List<Common.ColumnSchemaPB> schemaToListPb(Schema schema) {
return schemaToListPb(schema, EnumSet.noneOf(SchemaPBConversionFlags.class));
}

public static List<Common.ColumnSchemaPB> schemaToListPb(
Schema schema, EnumSet<SchemaPBConversionFlags> flags) {
ArrayList<Common.ColumnSchemaPB> columns =
new ArrayList<Common.ColumnSchemaPB>(schema.getColumnCount());
public static List<Common.ColumnSchemaPB> schemaToListPb(Schema schema,
EnumSet<SchemaPBConversionFlags> flags) {
ArrayList<Common.ColumnSchemaPB> columns = new ArrayList<>(schema.getColumnCount());
Common.ColumnSchemaPB.Builder schemaBuilder = Common.ColumnSchemaPB.newBuilder();
for (ColumnSchema col : schema.getColumns()) {
columns.add(columnToPb(schemaBuilder, col, flags));
int id = schema.hasColumnIds() ? schema.getColumnId(col.getName()) : -1;
columns.add(columnToPb(schemaBuilder, id, col, flags));
schemaBuilder.clear();
}
return columns;
Expand All @@ -74,31 +75,40 @@ public static Common.SchemaPB schemaToPb(Schema schema) {
return schemaToPb(schema, EnumSet.noneOf(SchemaPBConversionFlags.class));
}

public static Common.SchemaPB schemaToPb(
Schema schema, EnumSet<SchemaPBConversionFlags> flags) {
public static Common.SchemaPB schemaToPb(Schema schema,
EnumSet<SchemaPBConversionFlags> flags) {
Common.SchemaPB.Builder builder = Common.SchemaPB.newBuilder();
builder.addAllColumns(schemaToListPb(schema, flags));
return builder.build();
}

public static Common.ColumnSchemaPB columnToPb(ColumnSchema column) {
return columnToPb(Common.ColumnSchemaPB.newBuilder(), column);
return columnToPb(Common.ColumnSchemaPB.newBuilder(), -1, column);
}

public static Common.ColumnSchemaPB columnToPb(Common.ColumnSchemaPB.Builder schemaBuilder,
int colId,
ColumnSchema column) {
return columnToPb(schemaBuilder, column, EnumSet.noneOf(SchemaPBConversionFlags.class));
return columnToPb(schemaBuilder,
colId,
column,
EnumSet.noneOf(SchemaPBConversionFlags.class));
}

public static Common.ColumnSchemaPB columnToPb(Common.ColumnSchemaPB.Builder schemaBuilder,
ColumnSchema column, EnumSet<SchemaPBConversionFlags> flags) {
int colId,
ColumnSchema column,
EnumSet<SchemaPBConversionFlags> flags) {
schemaBuilder
.setName(column.getName())
.setType(column.getWireType())
.setIsKey(column.isKey())
.setIsNullable(column.isNullable())
.setCfileBlockSize(column.getDesiredBlockSize());

if (!flags.contains(SchemaPBConversionFlags.SCHEMA_PB_WITHOUT_ID) && colId >= 0) {
schemaBuilder.setId(colId);
}
if (column.getEncoding() != null) {
schemaBuilder.setEncoding(column.getEncoding().getInternalPbType());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;

import com.google.common.collect.ImmutableList;
Expand All @@ -37,6 +38,7 @@
import org.apache.kudu.Type;
import org.apache.kudu.client.PartitionSchema.HashBucketSchema;
import org.apache.kudu.client.PartitionSchema.RangeSchema;
import org.apache.kudu.client.ProtobufHelper.SchemaPBConversionFlags;
import org.apache.kudu.util.DecimalUtil;

public class TestKeyEncoding {
Expand All @@ -55,9 +57,10 @@ private static Schema buildSchema(ColumnSchemaBuilder... columns) {
int i = 0;
Common.SchemaPB.Builder pb = Common.SchemaPB.newBuilder();
for (ColumnSchemaBuilder column : columns) {
Common.ColumnSchemaPB.Builder columnPb =
ProtobufHelper.columnToPb(column.build()).toBuilder();
columnPb.setId(i++);
Common.ColumnSchemaPB columnPb =
ProtobufHelper.columnToPb(Common.ColumnSchemaPB.newBuilder(),
i++,
column.build());
pb.addColumns(columnPb);
}
return ProtobufHelper.pbToSchema(pb.build());
Expand Down
Loading

0 comments on commit 0f2946b

Please sign in to comment.