Skip to content

Commit

Permalink
HIVE-7800 - Parquet Column Index Access Schema Size Checking (Daniel …
Browse files Browse the repository at this point in the history
…Weeks via Brock)

git-svn-id: https://svn.apache.org/repos/asf/hive/trunk@1629752 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
Brock Noland committed Oct 6, 2014
1 parent 1631273 commit 2f9df52
Showing 1 changed file with 27 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,52 +75,58 @@ public parquet.hadoop.api.ReadSupport.ReadContext init(final Configuration confi
final Map<String, String> keyValueMetaData, final MessageType fileSchema) {
final String columns = configuration.get(IOConstants.COLUMNS);
final Map<String, String> contextMetadata = new HashMap<String, String>();
final boolean indexAccess = configuration.getBoolean(PARQUET_COLUMN_INDEX_ACCESS, false);
if (columns != null) {
final List<String> listColumns = getColumns(columns);
final Map<String, String> lowerCaseFileSchemaColumns = new HashMap<String,String>();
for (ColumnDescriptor c : fileSchema.getColumns()) {
lowerCaseFileSchemaColumns.put(c.getPath()[0].toLowerCase(), c.getPath()[0]);
}
final List<Type> typeListTable = new ArrayList<Type>();
for (String col : listColumns) {
col = col.toLowerCase();
// listColumns contains partition columns which are metadata only
if (lowerCaseFileSchemaColumns.containsKey(col)) {
typeListTable.add(fileSchema.getType(lowerCaseFileSchemaColumns.get(col)));
} else {
// below allows schema evolution
typeListTable.add(new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, col));
if(indexAccess) {
for (int index = 0; index < listColumns.size(); index++) {
//Take columns based on index or pad the field
if(index < fileSchema.getFieldCount()) {
typeListTable.add(fileSchema.getType(index));
} else {
//prefixing with '_mask_' to ensure no conflict with named
//columns in the file schema
typeListTable.add(new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, "_mask_"+listColumns.get(index)));
}
}
} else {
for (String col : listColumns) {
col = col.toLowerCase();
// listColumns contains partition columns which are metadata only
if (lowerCaseFileSchemaColumns.containsKey(col)) {
typeListTable.add(fileSchema.getType(lowerCaseFileSchemaColumns.get(col)));
} else {
// below allows schema evolution
typeListTable.add(new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, col));
}
}
}
MessageType tableSchema = new MessageType(TABLE_SCHEMA, typeListTable);
contextMetadata.put(HIVE_SCHEMA_KEY, tableSchema.toString());

MessageType requestedSchemaByUser = tableSchema;
final List<Integer> indexColumnsWanted = ColumnProjectionUtils.getReadColumnIDs(configuration);

final List<Type> typeListWanted = new ArrayList<Type>();
final boolean indexAccess = configuration.getBoolean(PARQUET_COLUMN_INDEX_ACCESS, false);

for (final Integer idx : indexColumnsWanted) {
if (idx < listColumns.size()) {
String col = listColumns.get(idx);
if (indexAccess) {
typeListWanted.add(tableSchema.getType(col));
typeListWanted.add(fileSchema.getFields().get(idx));
} else {
col = col.toLowerCase();
if (lowerCaseFileSchemaColumns.containsKey(col)) {
typeListWanted.add(tableSchema.getType(lowerCaseFileSchemaColumns.get(col)));
} else {
// should never occur?
String msg = "Column " + col + " at index " + idx + " does not exist in " +
lowerCaseFileSchemaColumns;
throw new IllegalStateException(msg);
}
}
}
}
requestedSchemaByUser = resolveSchemaAccess(new MessageType(fileSchema.getName(),
typeListWanted), fileSchema, configuration);

MessageType requestedSchemaByUser = new MessageType(fileSchema.getName(), typeListWanted);
return new ReadContext(requestedSchemaByUser, contextMetadata);
} else {
contextMetadata.put(HIVE_SCHEMA_KEY, fileSchema.toString());
Expand All @@ -147,26 +153,7 @@ public RecordMaterializer<ArrayWritable> prepareForRead(final Configuration conf
throw new IllegalStateException("ReadContext not initialized properly. " +
"Don't know the Hive Schema.");
}
final MessageType tableSchema = resolveSchemaAccess(MessageTypeParser.
parseMessageType(metadata.get(HIVE_SCHEMA_KEY)), fileSchema, configuration);
final MessageType tableSchema = MessageTypeParser.parseMessageType(metadata.get(HIVE_SCHEMA_KEY));
return new DataWritableRecordConverter(readContext.getRequestedSchema(), tableSchema);
}

/**
* Determine the file column names based on the position within the requested columns and
* use that as the requested schema.
*/
private MessageType resolveSchemaAccess(MessageType requestedSchema, MessageType fileSchema,
Configuration configuration) {
if (configuration.getBoolean(PARQUET_COLUMN_INDEX_ACCESS, false)) {
final List<String> listColumns = getColumns(configuration.get(IOConstants.COLUMNS));
List<Type> requestedTypes = new ArrayList<Type>();
for(Type t : requestedSchema.getFields()) {
int index = listColumns.indexOf(t.getName());
requestedTypes.add(fileSchema.getType(index));
}
requestedSchema = new MessageType(requestedSchema.getName(), requestedTypes);
}
return requestedSchema;
}
}
}

0 comments on commit 2f9df52

Please sign in to comment.