From d9bcbffc006481c09a8d2e04aa05cc92cd5c80d2 Mon Sep 17 00:00:00 2001 From: Yufan Sheng Date: Tue, 30 Aug 2022 10:31:36 +0800 Subject: [PATCH] [FLINK-28609][Connector/Pulsar] PulsarSchema didn't get properly serialized. (#20698) --- .../pulsar/common/schema/PulsarSchema.java | 4 +-- .../common/schema/PulsarSchemaTest.java | 28 +++++++++++++++++++ 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchema.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchema.java index 6ce91cdc67415..4c33d79d205b0 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchema.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchema.java @@ -19,6 +19,7 @@ package org.apache.flink.connector.pulsar.common.schema; import org.apache.flink.annotation.Internal; +import org.apache.flink.util.IOUtils; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.schema.SchemaInfoImpl; @@ -163,8 +164,7 @@ private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IO // Schema int byteLen = ois.readInt(); byte[] schemaBytes = new byte[byteLen]; - int read = ois.read(schemaBytes); - checkState(read == byteLen); + IOUtils.readFully(ois, schemaBytes, 0, byteLen); // Type int typeIdx = ois.readInt(); diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchemaTest.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchemaTest.java index 7011e169656d3..81074c993708f 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchemaTest.java +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchemaTest.java @@ -36,6 +36,8 @@ import org.apache.pulsar.common.schema.SchemaType; import org.junit.jupiter.api.Test; +import java.io.Serializable; + import static org.junit.jupiter.api.Assertions.assertAll; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -117,9 +119,35 @@ void pulsarSchemaSerialization() throws Exception { assertPulsarSchemaIsSerializable(new PulsarSchema<>(KV, Foo.class, FA.class)); } + @Test + void largeAvroSchemaSerialization() throws Exception { + Schema largeMessageSchema = Schema.AVRO(LargeMessage.class); + assertPulsarSchemaIsSerializable( + new PulsarSchema<>(largeMessageSchema, LargeMessage.class)); + } + private void assertPulsarSchemaIsSerializable(PulsarSchema schema) throws Exception { PulsarSchema clonedSchema = InstantiationUtil.clone(schema); assertEquals(clonedSchema.getSchemaInfo(), schema.getSchemaInfo()); assertEquals(clonedSchema.getRecordClass(), schema.getRecordClass()); } + + /** A POJO Class which would generate a large schema by Avro. */ + public static class LargeMessage implements Serializable { + private static final long serialVersionUID = 5364494369740402518L; + + public String + aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa; + public String + bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb; + public String + cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc; + public String + dddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddd; + public String + eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee; + // the problem begins + public String + ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff; + } }