Skip to content

Commit

Permalink
Add missed serialization schema argument for PulsarOutputFormat const…
Browse files Browse the repository at this point in the history
…ructor (apache#4373)

The constructor of PulsarOutputFormat expects a serializationSchema passed in otherwise it's member serializationSchema will be assigned by itself.
  • Loading branch information
liketic authored and sijie committed May 29, 2019
1 parent ba24d73 commit 4d1b2ff
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ public PulsarOutputFormat(String serviceUrl, String topicName, Authentication au
this.serializationSchema = serializationSchema;
}

public PulsarOutputFormat(ClientConfigurationData clientConfigurationData, ProducerConfigurationData producerConfigurationData) {
public PulsarOutputFormat(final ClientConfigurationData clientConfigurationData,
final ProducerConfigurationData producerConfigurationData,
final SerializationSchema<T> serializationSchema) {
super(clientConfigurationData, producerConfigurationData);
Preconditions.checkNotNull(serializationSchema, "serializationSchema cannot be null.");
this.serializationSchema = serializationSchema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void testPulsarOutputFormatConstructorV2WhenServiceUrlIsNull() {
.topicName("testTopic")
.build();

new PulsarOutputFormat(clientConf, producerConf);
new PulsarOutputFormat(clientConf, producerConf, text -> text.toString().getBytes());
}

@Test(expectedExceptions = IllegalArgumentException.class)
Expand All @@ -85,7 +85,7 @@ public void testPulsarOutputFormatConstructorV2WhenTopicNameIsNull() {
.topicName(null)
.build();

new PulsarOutputFormat(clientConf, producerConf);
new PulsarOutputFormat(clientConf, producerConf, text -> text.toString().getBytes());
}

@Test(expectedExceptions = IllegalArgumentException.class)
Expand All @@ -98,7 +98,7 @@ public void testPulsarOutputFormatConstructorV2WhenTopicNameIsBlank() {
.topicName(StringUtils.EMPTY)
.build();

new PulsarOutputFormat(clientConf, producerConf);
new PulsarOutputFormat(clientConf, producerConf, text -> text.toString().getBytes());
}

@Test(expectedExceptions = IllegalArgumentException.class)
Expand All @@ -111,7 +111,18 @@ public void testPulsarOutputFormatConstructorV2WhenServiceUrlIsBlank() {
.topicName("testTopic")
.build();

new PulsarOutputFormat(clientConf, producerConf);
new PulsarOutputFormat(clientConf, producerConf, text -> text.toString().getBytes());
}

@Test(expectedExceptions = NullPointerException.class)
public void testPulsarOutputFormatConstructorV2WhenSerializationSchemaIsNull() {
ClientConfigurationData clientConf = ClientConfigurationData.builder()
.serviceUrl("testServiceUrl")
.build();
ProducerConfigurationData producerConf = ProducerConfigurationData.builder()
.topicName("testTopic")
.build();
new PulsarOutputFormat(clientConf, producerConf, null);
}

@Test
Expand Down

0 comments on commit 4d1b2ff

Please sign in to comment.