Skip to content

Commit

Permalink
[BEAM-11482] Thrift support for KafkaTableProvider (apache#13572)
Browse files Browse the repository at this point in the history
  • Loading branch information
ccciudatu authored Dec 30, 2020
1 parent 4827bea commit 9de4ea4
Show file tree
Hide file tree
Showing 17 changed files with 2,673 additions and 107 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
## New Features / Improvements

* ParquetIO add methods _readGenericRecords_ and _readFilesGenericRecords_ can read files with an unknown schema. See [PR-13554](https://github.com/apache/beam/pull/13554) and ([BEAM-11460](https://issues.apache.org/jira/browse/BEAM-11460))
* Added support for thrift in KafkaTableProvider ([BEAM-11482](https://issues.apache.org/jira/browse/BEAM-11482))
* X feature added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).

## Breaking Changes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,5 +120,6 @@
<suppress files="org.apache.beam.sdk.io.thrift.TestThriftStruct.java" checks="[a-zA-Z0-9]*"/>
<suppress files="org.apache.beam.sdk.io.thrift.TestThriftInnerStruct.java" checks="[a-zA-Z0-9]*"/>
<suppress files="org.apache.beam.sdk.io.thrift.TestThriftUnion.java" checks="[a-zA-Z0-9]*"/>
<suppress files="org.apache.beam.sdk.extensions.sql.meta.provider.kafka.thrift.*" checks="[a-zA-Z0-9]*"/>

</suppressions>
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.schemas;

import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.ProcessFunction;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptor;

@Internal
public final class RowMessages {

private RowMessages() {}

public static <T> SimpleFunction<byte[], Row> bytesToRowFn(
SchemaProvider schemaProvider,
TypeDescriptor<T> typeDescriptor,
ProcessFunction<byte[], ? extends T> fromBytesFn) {
final SerializableFunction<T, Row> toRowFn =
checkArgumentNotNull(schemaProvider.toRowFunction(typeDescriptor));
return new BytesToRowFn<>(fromBytesFn, toRowFn);
}

public static <T> SimpleFunction<byte[], Row> bytesToRowFn(
SchemaProvider schemaProvider, TypeDescriptor<T> typeDescriptor, Coder<? extends T> coder) {
return bytesToRowFn(
schemaProvider, typeDescriptor, bytes -> coder.decode(new ByteArrayInputStream(bytes)));
}

private static final class BytesToRowFn<T> extends SimpleFunction<byte[], Row> {

private final ProcessFunction<byte[], ? extends T> fromBytesFn;
private final SerializableFunction<T, Row> toRowFn;

private BytesToRowFn(
ProcessFunction<byte[], ? extends T> fromBytesFn, SerializableFunction<T, Row> toRowFn) {
this.fromBytesFn = fromBytesFn;
this.toRowFn = toRowFn;
}

@Override
public Row apply(byte[] bytes) {
final T message;
try {
message = fromBytesFn.apply(bytes);
} catch (Exception e) {
throw new IllegalStateException("Could not decode bytes as message", e);
}
return toRowFn.apply(message);
}
}

public static <T> SimpleFunction<Row, byte[]> rowToBytesFn(
SchemaProvider schemaProvider,
TypeDescriptor<T> typeDescriptor,
ProcessFunction<? super T, byte[]> toBytesFn) {
final Schema schema = checkArgumentNotNull(schemaProvider.schemaFor(typeDescriptor));
final SerializableFunction<Row, T> fromRowFn =
checkArgumentNotNull(schemaProvider.fromRowFunction(typeDescriptor));
toBytesFn = checkArgumentNotNull(toBytesFn);
return new RowToBytesFn<>(schema, fromRowFn, toBytesFn);
}

public static <T> SimpleFunction<Row, byte[]> rowToBytesFn(
SchemaProvider schemaProvider, TypeDescriptor<T> typeDescriptor, Coder<? super T> coder) {
return rowToBytesFn(schemaProvider, typeDescriptor, message -> toBytes(coder, message));
}

private static <T> byte[] toBytes(Coder<? super T> coder, T message) throws IOException {
final ByteArrayOutputStream out = new ByteArrayOutputStream();
coder.encode(message, out);
return out.toByteArray();
}

private static final class RowToBytesFn<T> extends SimpleFunction<Row, byte[]> {

private final Schema schema;
private final SerializableFunction<Row, T> fromRowFn;
private final ProcessFunction<? super T, byte[]> toBytesFn;

private RowToBytesFn(
Schema schema,
SerializableFunction<Row, T> fromRowFn,
ProcessFunction<? super T, byte[]> toBytesFn) {
this.schema = schema;
this.fromRowFn = fromRowFn;
this.toBytesFn = toBytesFn;
}

@Override
public byte[] apply(Row row) {
if (!schema.equivalent(row.getSchema())) {
row = switchFieldsOrder(row);
}
final T message = fromRowFn.apply(row);
try {
return toBytesFn.apply(message);
} catch (Exception e) {
throw new IllegalStateException("Could not encode message as bytes", e);
}
}

private Row switchFieldsOrder(Row row) {
Row.Builder convertedRow = Row.withSchema(schema);
schema.getFields().forEach(field -> convertedRow.addValue(row.getValue(field.getName())));
return convertedRow.build();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import com.google.protobuf.DynamicMessage;
import com.google.protobuf.Message;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.List;
import java.util.Map;
Expand All @@ -32,14 +31,14 @@
import org.apache.beam.sdk.schemas.FieldValueGetter;
import org.apache.beam.sdk.schemas.FieldValueTypeInformation;
import org.apache.beam.sdk.schemas.GetterBasedSchemaProvider;
import org.apache.beam.sdk.schemas.RowMessages;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.SchemaUserTypeCreator;
import org.apache.beam.sdk.schemas.logicaltypes.OneOfType;
import org.apache.beam.sdk.schemas.utils.FieldValueTypeSupplier;
import org.apache.beam.sdk.schemas.utils.JavaBeanUtils;
import org.apache.beam.sdk.schemas.utils.ReflectUtils;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptor;
Expand Down Expand Up @@ -130,28 +129,12 @@ public SchemaUserTypeCreator schemaTypeCreator(Class<?> targetClass, Schema sche
"unchecked"
})
public static <T> SimpleFunction<byte[], Row> getProtoBytesToRowFn(Class<T> clazz) {
checkForMessageType(clazz);
return new ProtoBytesToRowFn(clazz);
}

private static class ProtoBytesToRowFn<T extends Message> extends SimpleFunction<byte[], Row> {
private final ProtoCoder<T> protoCoder;
private final SerializableFunction<T, Row> toRowFunction;

public ProtoBytesToRowFn(Class<T> clazz) {
this.protoCoder = ProtoCoder.of(clazz);
this.toRowFunction = new ProtoMessageSchema().toRowFunction(TypeDescriptor.of(clazz));
}

@Override
public Row apply(byte[] bytes) {
try {
T message = protoCoder.getParser().parseFrom(bytes);
return toRowFunction.apply(message);
} catch (IOException e) {
throw new IllegalArgumentException("Could not decode row from proto payload.", e);
}
}
Class<Message> protoClass = ensureMessageType(clazz);
ProtoCoder<Message> protoCoder = ProtoCoder.of(protoClass);
return RowMessages.bytesToRowFn(
new ProtoMessageSchema(),
TypeDescriptor.of(protoClass),
bytes -> protoCoder.getParser().parseFrom(bytes));
}

// Other modules are not allowed to use non-vendored Message class
Expand All @@ -160,37 +143,9 @@ public Row apply(byte[] bytes) {
"unchecked"
})
public static <T> SimpleFunction<Row, byte[]> getRowToProtoBytesFn(Class<T> clazz) {
checkForMessageType(clazz);
return new RowToProtoBytesFn(clazz);
}

private static class RowToProtoBytesFn<T extends Message> extends SimpleFunction<Row, byte[]> {
private final SerializableFunction<Row, T> toMessageFunction;
private final Schema protoSchema;

public RowToProtoBytesFn(Class<T> clazz) {
ProtoMessageSchema messageSchema = new ProtoMessageSchema();
TypeDescriptor<T> typeDescriptor = TypeDescriptor.of(clazz);
this.toMessageFunction = messageSchema.fromRowFunction(typeDescriptor);
this.protoSchema = messageSchema.schemaFor(typeDescriptor);
}

@Override
public byte[] apply(Row row) {
if (!protoSchema.equivalent(row.getSchema())) {
row = switchFieldsOrder(row);
}
Message message = toMessageFunction.apply(row);
return message.toByteArray();
}

private Row switchFieldsOrder(Row row) {
Row.Builder convertedRow = Row.withSchema(protoSchema);
protoSchema
.getFields()
.forEach(field -> convertedRow.addValue(row.getValue(field.getName())));
return convertedRow.build();
}
Class<Message> protoClass = ensureMessageType(clazz);
return RowMessages.rowToBytesFn(
new ProtoMessageSchema(), TypeDescriptor.of(protoClass), Message::toByteArray);
}

private <T> void checkForDynamicType(TypeDescriptor<T> typeDescriptor) {
Expand All @@ -200,11 +155,12 @@ private <T> void checkForDynamicType(TypeDescriptor<T> typeDescriptor) {
}
}

private static <T> void checkForMessageType(Class<T> clazz) {
private static Class<Message> ensureMessageType(Class<?> clazz) {
checkArgument(
Message.class.isAssignableFrom(clazz),
"%s is not a subtype of %s",
clazz.getName(),
Message.class.getSimpleName());
return (Class<Message>) clazz;
}
}
2 changes: 2 additions & 0 deletions sdks/java/extensions/sql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ applyJavaNature(
generatedClassPatterns: [
/^org\.apache\.beam\.sdk\.extensions\.sql\.impl\.parser\.impl.*/,
/^org\.apache\.beam\.sdk\.extensions\.sql\.meta\.provider\.kafka\.KafkaMessages/,
/^org\.apache\.beam\.sdk\.extensions\.sql\.meta\.provider\.kafka\.thrift.*/,
],
automaticModuleName: 'org.apache.beam.sdk.extensions.sql',
// javacc generated code produces lint warnings
Expand Down Expand Up @@ -77,6 +78,7 @@ dependencies {
provided project(":sdks:java:io:google-cloud-platform")
compile project(":sdks:java:io:mongodb")
provided project(":sdks:java:io:parquet")
provided project(":sdks:java:io:thrift")
provided library.java.jackson_dataformat_xml
provided library.java.hadoop_client
provided library.java.kafka_clients
Expand Down
Loading

0 comments on commit 9de4ea4

Please sign in to comment.