From d9691b71b58159e49300de58d9e8efd87a81673b Mon Sep 17 00:00:00 2001 From: wpl <1269223860@qq.com> Date: Sat, 22 Dec 2018 04:55:30 +0800 Subject: [PATCH] [issues 3232] integrate flink-json to pulsar (#3234) * Implements a batch program on Pulsar topic by writing Flink DataSet as Avro. * fix up * remove avro generated * fix up time-out * fix up time-out * fix up * modify review content code * modify Tests FAILURE * run Tests * run Tests * fix up pulsar-flink and flink-consumer-source * fix up pulsar-flink and flink-consumer-source * add flink streaming table as a Pulsar stream that serializes data in Avro format. * integrate flink-json to pulsar --- pulsar-flink/pom.xml | 6 + .../pulsar/PulsarJsonTableSink.java | 2 +- .../serde/JsonRowDeserializationSchema.java | 160 ------------------ .../serde/JsonRowSerializationSchema.java | 92 ---------- 4 files changed, 7 insertions(+), 253 deletions(-) delete mode 100644 pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/serde/JsonRowDeserializationSchema.java delete mode 100644 pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/serde/JsonRowSerializationSchema.java diff --git a/pulsar-flink/pom.xml b/pulsar-flink/pom.xml index c891005c293ec..92eb04595709a 100644 --- a/pulsar-flink/pom.xml +++ b/pulsar-flink/pom.xml @@ -49,6 +49,12 @@ true + + org.apache.flink + flink-json + ${flink.version} + + org.apache.flink flink-avro diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSink.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSink.java index 1a8d5e38eb67c..d6818fa4a56bc 100644 --- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSink.java +++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSink.java @@ -20,7 +20,7 @@ import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.java.typeutils.RowTypeInfo; -import org.apache.flink.streaming.connectors.pulsar.serde.JsonRowSerializationSchema; +import org.apache.flink.formats.json.JsonRowSerializationSchema; import org.apache.flink.types.Row; import org.apache.pulsar.client.api.ProducerConfiguration; diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/serde/JsonRowDeserializationSchema.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/serde/JsonRowDeserializationSchema.java deleted file mode 100644 index 0235e61ebdc1e..0000000000000 --- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/serde/JsonRowDeserializationSchema.java +++ /dev/null @@ -1,160 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.flink.streaming.connectors.pulsar.serde; - -import org.apache.flink.api.common.serialization.DeserializationSchema; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.RowTypeInfo; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.flink.types.Row; -import org.apache.flink.util.Preconditions; - -import java.io.IOException; - -/** - * Deserialization schema from JSON to {@link Row}. - * - *

Deserializes the byte[] messages as a JSON object and reads - * the specified fields. - * - *

Failure during deserialization are forwarded as wrapped IOExceptions. - */ -public class JsonRowDeserializationSchema implements DeserializationSchema { - - /* - What to do when detecting that a json line cannot be deserialized : - (1).false : Throw A IOException and Terminate application。 - (2).true : Ignore the error line and add a null line。 - */ - private boolean ignoreJsonFormatError = false; - - - /** - * - * @return true or false - */ - public boolean getIgnoreJsonFormatError() { - return ignoreJsonFormatError; - } - - /** - * set ignoreJsonFormatError - * @param ignoreJsonFormatError - */ - public void setIgnoreJsonFormatError(boolean ignoreJsonFormatError) { - this.ignoreJsonFormatError = ignoreJsonFormatError; - } - - /** - * Type information describing the result type. - */ - private final TypeInformation typeInfo; - - /** - * Field names to parse. Indices match fieldTypes indices. - */ - private final String[] fieldNames; - - /** - * Types to parse fields as. Indices match fieldNames indices. - */ - private final TypeInformation[] fieldTypes; - - /** - * Object mapper for parsing the JSON. - */ - private final ObjectMapper objectMapper = new ObjectMapper(); - - /** - * Flag indicating whether to fail on a missing field. - */ - private boolean failOnMissingField; - - /** - * Creates a JSON deserialization schema for the given fields and types. - * - * @param typeInfo Type information describing the result type. The field names are used - * to parse the JSON file and so are the types. - */ - public JsonRowDeserializationSchema(TypeInformation typeInfo) { - Preconditions.checkNotNull(typeInfo, "Type information"); - this.typeInfo = typeInfo; - - this.fieldNames = ((RowTypeInfo) typeInfo).getFieldNames(); - this.fieldTypes = ((RowTypeInfo) typeInfo).getFieldTypes(); - } - - @Override - public Row deserialize(byte[] message) throws IOException { - try { - JsonNode root = objectMapper.readTree(message); - - Row row = new Row(fieldNames.length); - for (int i = 0; i < fieldNames.length; i++) { - JsonNode node = root.get(fieldNames[i]); - - if (node == null) { - if (failOnMissingField) { - throw new IllegalStateException("Failed to find field with name '" - + fieldNames[i] + "'."); - } else { - row.setField(i, null); - } - } else { - // Read the value as specified type - Object value = objectMapper.treeToValue(node, fieldTypes[i].getTypeClass()); - row.setField(i, value); - } - } - - return row; - } catch (Throwable t) { - if (ignoreJsonFormatError) { - final int arity = typeInfo.getArity(); - final Object[] nullsArray = new Object[arity]; - return Row.of(nullsArray); - } else { - throw new IOException("Failed to deserialize JSON object.", t); - } - } - } - - @Override - public boolean isEndOfStream(Row nextElement) { - return false; - } - - @Override - public TypeInformation getProducedType() { - return typeInfo; - } - - /** - * Configures the failure behaviour if a JSON field is missing. - * - *

By default, a missing field is ignored and the field is set to null. - * - * @param failOnMissingField Flag indicating whether to fail or not on a missing field. - */ - public void setFailOnMissingField(boolean failOnMissingField) { - this.failOnMissingField = failOnMissingField; - } - -} diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/serde/JsonRowSerializationSchema.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/serde/JsonRowSerializationSchema.java deleted file mode 100644 index 503f01e2cc4e3..0000000000000 --- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/serde/JsonRowSerializationSchema.java +++ /dev/null @@ -1,92 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.flink.streaming.connectors.pulsar.serde; - -import org.apache.flink.api.common.serialization.SerializationSchema; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.CompositeType; -import org.apache.flink.api.java.typeutils.RowTypeInfo; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.flink.types.Row; -import org.apache.flink.util.Preconditions; - -/** - * Serialization schema that serializes an object into a JSON bytes. - * - *

Serializes the input {@link Row} object into a JSON string and - * converts it into byte[]. - * - *

Result byte[] messages can be deserialized using - * {@link JsonRowDeserializationSchema}. - */ -public class JsonRowSerializationSchema implements SerializationSchema { - /** - * Fields names in the input Row object. - */ - private final String[] fieldNames; - /** - * Object mapper that is used to create output JSON objects. - */ - private static ObjectMapper mapper = new ObjectMapper(); - - /** - * Creates a JSON serialization schema for the given fields and types. - * - * @param rowSchema The schema of the rows to encode. - */ - public JsonRowSerializationSchema(RowTypeInfo rowSchema) { - - Preconditions.checkNotNull(rowSchema); - String[] fieldNames = rowSchema.getFieldNames(); - TypeInformation[] fieldTypes = rowSchema.getFieldTypes(); - - // check that no field is composite - for (int i = 0; i < fieldTypes.length; i++) { - if (fieldTypes[i] instanceof CompositeType) { - throw new IllegalArgumentException("JsonRowSerializationSchema cannot encode rows with nested schema, " + - "but field '" + fieldNames[i] + "' is nested: " + fieldTypes[i].toString()); - } - } - - this.fieldNames = fieldNames; - } - - @Override - public byte[] serialize(Row row) { - if (row.getArity() != fieldNames.length) { - throw new IllegalStateException(String.format( - "Number of elements in the row %s is different from number of field names: %d", row, fieldNames.length)); - } - - ObjectNode objectNode = mapper.createObjectNode(); - - for (int i = 0; i < row.getArity(); i++) { - JsonNode node = mapper.valueToTree(row.getField(i)); - objectNode.set(fieldNames[i], node); - } - - try { - return mapper.writeValueAsBytes(objectNode); - } catch (Exception e) { - throw new RuntimeException("Failed to serialize row", e); - } - } -}