Skip to content

Commit

Permalink
Only use BytesHiveRecordCursor for ColumnarSerDe
Browse files Browse the repository at this point in the history
This fixes reading data for tables that use LazyBinaryColumnarSerDe.
  • Loading branch information
electrum committed Oct 31, 2013
1 parent 40e81d6 commit 9728f87
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 16 deletions.
45 changes: 34 additions & 11 deletions presto-hive/create-test.sql
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ CREATE TABLE presto_test_unpartitioned (
t_tinyint TINYINT
)
COMMENT 'Presto test data'
STORED AS TEXTFILE
TBLPROPERTIES ('RETENTION'='-1')
;

Expand Down Expand Up @@ -118,24 +119,46 @@ DROP TABLE tmp_presto_test_load;

ALTER TABLE presto_test SET FILEFORMAT RCFILE;
ALTER TABLE presto_test SET SERDE 'org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe';
ALTER TABLE presto_test ADD PARTITION (ds='2012-12-29', file_format='rcfile', dummy=0);
INSERT INTO TABLE presto_test PARTITION (ds='2012-12-29', file_format='rcfile', dummy=0)
ALTER TABLE presto_test ADD PARTITION (ds='2012-12-29', file_format='rcfile-text', dummy=0);
INSERT INTO TABLE presto_test PARTITION (ds='2012-12-29', file_format='rcfile-text', dummy=0)
SELECT
CASE WHEN n % 19 = 0 THEN NULL ELSE 'rcfile test' END
CASE WHEN n % 19 = 0 THEN NULL ELSE 'rcfile-text test' END
, 1 + n
, 2 + n
, 3 + n
, 4 + n + CASE WHEN n % 13 = 0 THEN NULL ELSE 0 END
, 5.1 + n
, 6.2 + n
, CASE WHEN n % 29 = 0 THEN NULL ELSE map('format', 'rcfile') END
, CASE WHEN n % 29 = 0 THEN NULL ELSE map('format', 'rcfile-text') END
, CASE n % 3 WHEN 0 THEN false WHEN 1 THEN true ELSE NULL END
, CASE WHEN n % 17 = 0 THEN NULL ELSE '2011-05-06 07:08:09.1234567' END
, CASE WHEN n % 23 = 0 THEN NULL ELSE CAST('rcfile test' AS BINARY) END
, CASE WHEN n % 27 = 0 THEN NULL ELSE array('rcfile', 'test', 'data') END
, CASE WHEN n % 23 = 0 THEN NULL ELSE CAST('rcfile-text test' AS BINARY) END
, CASE WHEN n % 27 = 0 THEN NULL ELSE array('rcfile-text', 'test', 'data') END
, CASE WHEN n % 31 = 0 THEN NULL ELSE
map(1, array(named_struct('s_string', 'rcfile-a', 's_double', 0.1),
named_struct('s_string' , 'rcfile-b', 's_double', 0.2))) END
map(1, array(named_struct('s_string', 'rcfile-text-a', 's_double', 0.1),
named_struct('s_string' , 'rcfile-text-b', 's_double', 0.2))) END
FROM tmp_presto_test LIMIT 100;

ALTER TABLE presto_test SET FILEFORMAT RCFILE;
ALTER TABLE presto_test SET SERDE 'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe';
ALTER TABLE presto_test ADD PARTITION (ds='2012-12-29', file_format='rcfile-binary', dummy=2);
INSERT INTO TABLE presto_test PARTITION (ds='2012-12-29', file_format='rcfile-binary', dummy=2)
SELECT
CASE WHEN n % 19 = 0 THEN NULL ELSE 'rcfile-binary test' END
, 201 + n
, 202 + n
, 203 + n
, 204 + n + CASE WHEN n % 13 = 0 THEN NULL ELSE 0 END
, 205.1 + n
, 206.2 + n
, CASE WHEN n % 29 = 0 THEN NULL ELSE map('format', 'rcfile-binary') END
, CASE n % 3 WHEN 0 THEN false WHEN 1 THEN true ELSE NULL END
, CASE WHEN n % 17 = 0 THEN NULL ELSE '2011-05-06 07:08:09.1234567' END
, CASE WHEN n % 23 = 0 THEN NULL ELSE CAST('rcfile-binary test' AS BINARY) END
, CASE WHEN n % 27 = 0 THEN NULL ELSE array('rcfile-binary', 'test', 'data') END
, CASE WHEN n % 31 = 0 THEN NULL ELSE
map(1, array(named_struct('s_string', 'rcfile-binary-a', 's_double', 0.1),
named_struct('s_string' , 'rcfile-binary-b', 's_double', 0.2))) END
FROM tmp_presto_test LIMIT 100;

ALTER TABLE presto_test SET FILEFORMAT SEQUENCEFILE;
Expand Down Expand Up @@ -204,19 +227,19 @@ INSERT OVERWRITE TABLE presto_test_bucketed_by_string_int
PARTITION (ds='2012-12-29')
SELECT t_string, t_tinyint, t_smallint, t_int, t_bigint, t_float, t_double, t_boolean
FROM presto_test
WHERE ds = '2012-12-29'
WHERE file_format <> 'rcfile-binary'
;

INSERT OVERWRITE TABLE presto_test_bucketed_by_bigint_boolean
PARTITION (ds='2012-12-29')
SELECT t_string, t_tinyint, t_smallint, t_int, t_bigint, t_float, t_double, t_boolean
FROM presto_test
WHERE ds = '2012-12-29'
WHERE file_format <> 'rcfile-binary'
;

INSERT OVERWRITE TABLE presto_test_bucketed_by_double_float
PARTITION (ds='2012-12-29')
SELECT t_string, t_tinyint, t_smallint, t_int, t_bigint, t_float, t_double, t_boolean
FROM presto_test
WHERE ds = '2012-12-29'
WHERE file_format <> 'rcfile-binary'
;
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
Expand Down Expand Up @@ -54,6 +55,7 @@
import static com.google.common.base.Predicates.not;
import static com.google.common.collect.Iterables.filter;
import static com.google.common.collect.Lists.transform;
import static org.apache.hadoop.hive.metastore.MetaStoreUtils.getDeserializer;
import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_NULL_FORMAT;

public class HiveRecordSet
Expand Down Expand Up @@ -108,7 +110,7 @@ public RecordCursor cursor()

RecordReader<?, ?> recordReader = createRecordReader(split, configuration, wrappedPath);

if (recordReader.createValue() instanceof BytesRefArrayWritable) {
if (usesColumnarSerDe(split)) {
return new BytesHiveRecordCursor<>(
bytesRecordReader(recordReader),
split.getLength(),
Expand Down Expand Up @@ -137,6 +139,16 @@ private static RecordReader<?, BytesRefArrayWritable> bytesRecordReader(RecordRe
return (RecordReader<?, ? extends Writable>) recordReader;
}

private static boolean usesColumnarSerDe(HiveSplit split)
{
try {
return getDeserializer(null, split.getSchema()) instanceof ColumnarSerDe;
}
catch (MetaException e) {
throw Throwables.propagate(e);
}
}

private static HiveColumnHandle getFirstPrimitiveColumn(String clientId, Properties schema)
{
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,12 @@ public void setupHive(String connectorId, String databaseName)

partitions = ImmutableSet.<Partition>of(
new HivePartition(table,
"ds=2012-12-29/file_format=rcfile/dummy=0",
ImmutableMap.<ColumnHandle, Object>of(dsColumn, "2012-12-29", fileFormatColumn, "rcfile", dummyColumn, 0L),
"ds=2012-12-29/file_format=rcfile-text/dummy=0",
ImmutableMap.<ColumnHandle, Object>of(dsColumn, "2012-12-29", fileFormatColumn, "rcfile-text", dummyColumn, 0L),
Optional.<Integer>absent()),
new HivePartition(table,
"ds=2012-12-29/file_format=rcfile-binary/dummy=2",
ImmutableMap.<ColumnHandle, Object>of(dsColumn, "2012-12-29", fileFormatColumn, "rcfile-binary", dummyColumn, 2L),
Optional.<Integer>absent()),
new HivePartition(table,
"ds=2012-12-29/file_format=sequencefile/dummy=4",
Expand Down Expand Up @@ -308,7 +312,7 @@ public void testGetPartitionSplitsBatch()
Iterable<Split> iterator = splitManager.getPartitionSplits(tableHandle, partitions);

List<Split> splits = ImmutableList.copyOf(iterator);
assertEquals(splits.size(), 3);
assertEquals(splits.size(), partitions.size());
}

@Test
Expand Down Expand Up @@ -507,6 +511,7 @@ public void testGetRecords()
long rowNumber = 0;
long completedBytes = 0;
try (RecordCursor cursor = recordSetProvider.getRecordSet(hiveSplit, columnHandles).cursor()) {
assertRecordCursorType(cursor, fileType);
assertEquals(cursor.getTotalBytes(), hiveSplit.getLength());

while (cursor.advanceNextPosition()) {
Expand Down Expand Up @@ -624,6 +629,7 @@ public void testGetPartialRecords()

long rowNumber = 0;
try (RecordCursor cursor = recordSetProvider.getRecordSet(hiveSplit, columnHandles).cursor()) {
assertRecordCursorType(cursor, fileType);
while (cursor.advanceNextPosition()) {
rowNumber++;

Expand Down Expand Up @@ -656,6 +662,7 @@ public void testGetRecordsUnpartitioned()

long rowNumber = 0;
try (RecordCursor cursor = recordSetProvider.getRecordSet(split, columnHandles).cursor()) {
assertRecordCursorType(cursor, "textfile");
assertEquals(cursor.getTotalBytes(), hiveSplit.getLength());

while (cursor.advanceNextPosition()) {
Expand Down Expand Up @@ -709,8 +716,10 @@ private TableHandle getTableHandle(SchemaTableName tableName)
private static long getBaseValueForFileType(String fileType)
{
switch (fileType) {
case "rcfile":
case "rcfile-text":
return 0;
case "rcfile-binary":
return 200;
case "sequencefile":
return 400;
case "textfile":
Expand All @@ -720,6 +729,16 @@ private static long getBaseValueForFileType(String fileType)
}
}

private static void assertRecordCursorType(RecordCursor cursor, String fileType)
{
if (fileType.equals("rcfile-text")) {
assertInstanceOf(cursor, BytesHiveRecordCursor.class, fileType);
}
else {
assertInstanceOf(cursor, GenericHiveRecordCursor.class, fileType);
}
}

private static void assertReadFields(RecordCursor cursor, List<ColumnMetadata> schema)
{
for (int columnIndex = 0; columnIndex < schema.size(); columnIndex++) {
Expand Down

0 comments on commit 9728f87

Please sign in to comment.