Skip to content

Commit

Permalink
enable java enums to be queried in sql (apache#3202)
Browse files Browse the repository at this point in the history
### Motivation

Allow enums in POJOs to be visible in sql for querying

Enums in POJOs will be presented as VARCHAR in Presto
  • Loading branch information
jerrypeng authored and sijie committed Dec 18, 2018
1 parent ee53ab6 commit 5d24067
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,12 @@ static List<PulsarColumnMetadata> getColumns(String fieldName, Schema fieldSchem
} else if (fieldSchema.getType() == Schema.Type.MAP) {

} else if (fieldSchema.getType() == Schema.Type.ENUM) {
PulsarColumnMetadata columnMetadata = new PulsarColumnMetadata(fieldName,
convertType(fieldSchema.getType(), fieldSchema.getLogicalType()),
null, null, false, false,
fieldNames.toArray(new String[fieldNames.size()]),
positionIndices.toArray(new Integer[positionIndices.size()]));
columnMetadataList.add(columnMetadata);

} else if (fieldSchema.getType() == Schema.Type.FIXED) {

Expand Down Expand Up @@ -433,6 +439,8 @@ static Type convertType(Schema.Type avroType, LogicalType logicalType) {
return VarbinaryType.VARBINARY;
case STRING:
return VarcharType.VARCHAR;
case ENUM:
return VarcharType.VARCHAR;
default:
log.error("Cannot convert type: %s", avroType);
return null;
Expand All @@ -449,5 +457,6 @@ static boolean isPrimitiveType(Schema.Type type) {
|| Schema.Type.DOUBLE == type
|| Schema.Type.BYTES == type
|| Schema.Type.STRING == type;

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,12 @@ public static class Bar {
public int field1;
}

public enum TestEnum {
TEST_ENUM_1,
TEST_ENUM_2,
TEST_ENUM_3
}

public int field1;
public String field2;
public float field3;
Expand All @@ -156,6 +162,7 @@ public static class Bar {
@org.apache.avro.reflect.AvroSchema("{ \"type\": \"int\", \"logicalType\": \"date\" }")
public int date;
public TestPulsarConnector.Bar bar;
public TestEnum field7;
}

public static class Bar {
Expand Down Expand Up @@ -249,6 +256,8 @@ public static class Boo {
fooTypes.put("bar.test2.field5", BooleanType.BOOLEAN);
fooTypes.put("bar.test2.field6", BigintType.BIGINT);
fooTypes.put("bar.test2.foobar.field1", IntegerType.INTEGER);
// Enums currently map to VARCHAR
fooTypes.put("field7", VarcharType.VARCHAR);

topicsToNumEntries = new HashMap<>();
topicsToNumEntries.put(TOPIC_1.getSchemaName(), 1233L);
Expand Down Expand Up @@ -510,6 +519,19 @@ public static class Boo {
bar_test2_foobar_fieldNames1,
bar_test2_foobar_positionIndices1));

String[] fieldNames10 = {"field7"};
Integer[] positionIndices10 = {10};
fooFieldNames.put("field7", fieldNames10);
fooPositionIndices.put("field7", positionIndices10);
fooColumnHandles.add(new PulsarColumnHandle(pulsarConnectorId.toString(),
"field7",
fooTypes.get("field7"),
false,
false,
fieldNames10,
positionIndices10));


fooColumnHandles.addAll(PulsarInternalColumn.getInternalFields().stream().map(
new Function<PulsarInternalColumn, PulsarColumnHandle>() {
@Override
Expand Down Expand Up @@ -565,7 +587,7 @@ public PulsarColumnHandle apply(PulsarInternalColumn pulsarInternalColumn) {
fooFunctions.put("bar.test2.field5", integer -> (integer + 1) % 32 == 0);
fooFunctions.put("bar.test2.field6", integer -> integer + 15L);
fooFunctions.put("bar.test2.foobar.field1", integer -> integer % 3);

fooFunctions.put("field7", integer -> Foo.TestEnum.values()[integer % Foo.TestEnum.values().length]);

} catch (Throwable e) {
System.out.println("Error: " + e);
Expand Down Expand Up @@ -844,6 +866,7 @@ public void run() {
foo.time = (int) fooFunctions.get("time").apply(count);
foo.date = (int) fooFunctions.get("date").apply(count);
foo.bar = bar;
foo.field7 = (Foo.TestEnum) fooFunctions.get("field7").apply(count);

PulsarApi.MessageMetadata messageMetadata = PulsarApi.MessageMetadata.newBuilder()
.setProducerName("test-producer").setSequenceId(positions.get(topic))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ public void testTopics() throws Exception {
} else if (fooColumnHandles.get(i).getName().equals("bar.test2.foobar.field1")) {
Assert.assertEquals(pulsarRecordCursor.getLong(i), ((Integer) fooFunctions.get("bar.test2.foobar.field1").apply(count)).longValue());
columnsSeen.add(fooColumnHandles.get(i).getName());
} else if (fooColumnHandles.get(i).getName().equals("field7")) {
Assert.assertEquals(pulsarRecordCursor.getSlice(i).getBytes(), fooFunctions.get("field7").apply(count).toString().getBytes());
columnsSeen.add(fooColumnHandles.get(i).getName());
} else {
if (PulsarInternalColumn.getInternalFieldsMap().containsKey(fooColumnHandles.get(i).getName())) {
columnsSeen.add(fooColumnHandles.get(i).getName());
Expand Down

0 comments on commit 5d24067

Please sign in to comment.