Skip to content

Commit

Permalink
[FLINK-28609][Connector/Pulsar] PulsarSchema didn't get properly seri…
Browse files Browse the repository at this point in the history
…alized. (apache#20698)
  • Loading branch information
syhily authored Aug 30, 2022
1 parent e18782f commit d9bcbff
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.connector.pulsar.common.schema;

import org.apache.flink.annotation.Internal;
import org.apache.flink.util.IOUtils;

import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
Expand Down Expand Up @@ -163,8 +164,7 @@ private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IO
// Schema
int byteLen = ois.readInt();
byte[] schemaBytes = new byte[byteLen];
int read = ois.read(schemaBytes);
checkState(read == byteLen);
IOUtils.readFully(ois, schemaBytes, 0, byteLen);

// Type
int typeIdx = ois.readInt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.apache.pulsar.common.schema.SchemaType;
import org.junit.jupiter.api.Test;

import java.io.Serializable;

import static org.junit.jupiter.api.Assertions.assertAll;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -117,9 +119,35 @@ void pulsarSchemaSerialization() throws Exception {
assertPulsarSchemaIsSerializable(new PulsarSchema<>(KV, Foo.class, FA.class));
}

@Test
void largeAvroSchemaSerialization() throws Exception {
Schema<LargeMessage> largeMessageSchema = Schema.AVRO(LargeMessage.class);
assertPulsarSchemaIsSerializable(
new PulsarSchema<>(largeMessageSchema, LargeMessage.class));
}

private <T> void assertPulsarSchemaIsSerializable(PulsarSchema<T> schema) throws Exception {
PulsarSchema<T> clonedSchema = InstantiationUtil.clone(schema);
assertEquals(clonedSchema.getSchemaInfo(), schema.getSchemaInfo());
assertEquals(clonedSchema.getRecordClass(), schema.getRecordClass());
}

/** A POJO Class which would generate a large schema by Avro. */
public static class LargeMessage implements Serializable {
private static final long serialVersionUID = 5364494369740402518L;

public String
aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa;
public String
bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb;
public String
cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc;
public String
dddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddd;
public String
eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee;
// the problem begins
public String
ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff;
}
}

0 comments on commit d9bcbff

Please sign in to comment.