Skip to content

Commit

Permalink
Merge pull request apache#10984 from [BEAM-9394] DynamicMessage handl…
Browse files Browse the repository at this point in the history
…ing of empty map violates nullability

[BEAM-9394] DynamicMessage handling of empty map violates schema nullability
  • Loading branch information
alexvanboxel authored Feb 27, 2020
2 parents 860131b + 34d7c85 commit 777a401
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -561,10 +561,10 @@ static class MapConvert extends Convert<Map, Map> {
@Override
Map getFromProtoMessage(Message message) {
List<Message> list = (List<Message>) message.getField(getFieldDescriptor(message));
Map<Object, Object> rowMap = new HashMap<>();
if (list.size() == 0) {
return null;
return rowMap;
}
Map<Object, Object> rowMap = new HashMap<>();
list.forEach(
entryMessage -> {
Descriptors.Descriptor entryDescriptor = entryMessage.getDescriptorForType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NESTED_PROTO;
import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NESTED_ROW;
import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NESTED_SCHEMA;
import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NULL_MAP_PRIMITIVE_PROTO;
import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NULL_MAP_PRIMITIVE_ROW;
import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NULL_REPEATED_PROTO;
import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NULL_REPEATED_ROW;
import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.ONEOF_PROTO_BOOL;
import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.ONEOF_PROTO_INT32;
import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.ONEOF_PROTO_PRIMITIVE;
Expand Down Expand Up @@ -126,6 +130,22 @@ public void testRepeatedRowToProto() {
assertEquals(REPEATED_PROTO.toString(), fromRow.apply(REPEATED_ROW).toString());
}

@Test
public void testNullRepeatedProtoToRow() throws InvalidProtocolBufferException {
ProtoDynamicMessageSchema schemaProvider =
schemaFromDescriptor(RepeatPrimitive.getDescriptor());
SerializableFunction<DynamicMessage, Row> toRow = schemaProvider.getToRowFunction();
assertEquals(NULL_REPEATED_ROW, toRow.apply(toDynamic(NULL_REPEATED_PROTO)));
}

@Test
public void testNullRepeatedRowToProto() {
ProtoDynamicMessageSchema schemaProvider =
schemaFromDescriptor(RepeatPrimitive.getDescriptor());
SerializableFunction<Row, DynamicMessage> fromRow = schemaProvider.getFromRowFunction();
assertEquals(NULL_REPEATED_PROTO.toString(), fromRow.apply(NULL_REPEATED_ROW).toString());
}

// Test map type
@Test
public void testMapSchema() {
Expand All @@ -148,6 +168,21 @@ public void testMapRowToProto() {
assertEquals(MAP_PRIMITIVE_PROTO.toString(), fromRow.apply(MAP_PRIMITIVE_ROW).toString());
}

@Test
public void testNullMapProtoToRow() throws InvalidProtocolBufferException {
ProtoDynamicMessageSchema schemaProvider = schemaFromDescriptor(MapPrimitive.getDescriptor());
SerializableFunction<DynamicMessage, Row> toRow = schemaProvider.getToRowFunction();
assertEquals(NULL_MAP_PRIMITIVE_ROW, toRow.apply(toDynamic(NULL_MAP_PRIMITIVE_PROTO)));
}

@Test
public void testNullMapRowToProto() {
ProtoDynamicMessageSchema schemaProvider = schemaFromDescriptor(MapPrimitive.getDescriptor());
SerializableFunction<Row, DynamicMessage> fromRow = schemaProvider.getFromRowFunction();
assertEquals(
NULL_MAP_PRIMITIVE_PROTO.toString(), fromRow.apply(NULL_MAP_PRIMITIVE_ROW).toString());
}

@Test
public void testNestedSchema() {
ProtoDynamicMessageSchema schemaProvider = schemaFromDescriptor(Nested.getDescriptor());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NESTED_PROTO;
import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NESTED_ROW;
import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NESTED_SCHEMA;
import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NULL_MAP_PRIMITIVE_PROTO;
import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NULL_MAP_PRIMITIVE_ROW;
import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NULL_REPEATED_PROTO;
import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.NULL_REPEATED_ROW;
import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.ONEOF_PROTO_BOOL;
import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.ONEOF_PROTO_INT32;
import static org.apache.beam.sdk.extensions.protobuf.TestProtoSchemas.ONEOF_PROTO_PRIMITIVE;
Expand Down Expand Up @@ -158,6 +162,20 @@ public void testRepeatedRowToProto() {
assertEquals(REPEATED_PROTO, fromRow.apply(REPEATED_ROW));
}

@Test
public void testNullRepeatedProtoToRow() {
SerializableFunction<RepeatPrimitive, Row> toRow =
new ProtoMessageSchema().toRowFunction(TypeDescriptor.of(RepeatPrimitive.class));
assertEquals(NULL_REPEATED_ROW, toRow.apply(NULL_REPEATED_PROTO));
}

@Test
public void testNullRepeatedRowToProto() {
SerializableFunction<Row, RepeatPrimitive> fromRow =
new ProtoMessageSchema().fromRowFunction(TypeDescriptor.of(RepeatPrimitive.class));
assertEquals(NULL_REPEATED_PROTO, fromRow.apply(NULL_REPEATED_ROW));
}

// Test map type
@Test
public void testMapSchema() {
Expand All @@ -179,6 +197,20 @@ public void testMapRowToProto() {
assertEquals(MAP_PRIMITIVE_PROTO, fromRow.apply(MAP_PRIMITIVE_ROW));
}

@Test
public void testNullMapProtoToRow() {
SerializableFunction<MapPrimitive, Row> toRow =
new ProtoMessageSchema().toRowFunction(TypeDescriptor.of(MapPrimitive.class));
assertEquals(NULL_MAP_PRIMITIVE_ROW, toRow.apply(NULL_MAP_PRIMITIVE_PROTO));
}

@Test
public void testNullMapRowToProto() {
SerializableFunction<Row, MapPrimitive> fromRow =
new ProtoMessageSchema().fromRowFunction(TypeDescriptor.of(MapPrimitive.class));
assertEquals(NULL_MAP_PRIMITIVE_PROTO, fromRow.apply(NULL_MAP_PRIMITIVE_ROW));
}

@Test
public void testNestedSchema() {
Schema schema = new ProtoMessageSchema().schemaFor(TypeDescriptor.of(Nested.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,28 @@ class TestProtoSchemas {
ImmutableList.of(ByteString.copyFrom(BYTE_ARRAY), ByteString.copyFrom(BYTE_ARRAY)))
.build();

static final Row NULL_REPEATED_ROW =
Row.withSchema(REPEATED_SCHEMA)
.addArray()
.addArray()
.addArray()
.addArray()
.addArray()
.addArray()
.addArray()
.addArray()
.addArray()
.addArray()
.addArray()
.addArray()
.addArray()
.addArray()
.addArray()
.build();

// A sample instance of the proto.
static final RepeatPrimitive NULL_REPEATED_PROTO = RepeatPrimitive.newBuilder().build();

// The schema for the MapPrimitive proto.
static final Schema MAP_PRIMITIVE_SCHEMA =
Schema.builder()
Expand Down Expand Up @@ -274,6 +296,18 @@ class TestProtoSchemas {
"k1", ByteString.copyFrom(BYTE_ARRAY), "k2", ByteString.copyFrom(BYTE_ARRAY)))
.build();

// A sample instance of the row.
static final Row NULL_MAP_PRIMITIVE_ROW =
Row.withSchema(MAP_PRIMITIVE_SCHEMA)
.addValue(ImmutableMap.of())
.addValue(ImmutableMap.of())
.addValue(ImmutableMap.of())
.addValue(ImmutableMap.of())
.build();

// A sample instance of the proto.
static final MapPrimitive NULL_MAP_PRIMITIVE_PROTO = MapPrimitive.newBuilder().build();

// The schema for the Nested proto.
static final Schema NESTED_SCHEMA =
Schema.builder()
Expand Down

0 comments on commit 777a401

Please sign in to comment.