Skip to content

Commit

Permalink
Add Flink - Pulsar Batch Sink Support (apache#2979)
Browse files Browse the repository at this point in the history
### Motivation

This PR aims to bring Flink - Pulsar `Batch Sink` Support. If user works with Flink `DataSet` API and would like to write these `DataSets` to Pulsar, this sink can help.

*Ref:* [Flink Batch Sink API](https://ci.apache.org/projects/flink/flink-docs-stable/dev/batch/#data-sinks)

### Modifications

Please find the change-set as follows:
- Defines `PulsarOutputFormat` to write Flink Batch `DataSets` into Pulsar.
- UT Coverage
- `FlinkPulsarBatchSinkExample` to show how to use and to be used by users.
- `README.md` documentation
- Minor `javadoc` fix
  • Loading branch information
erenavsarogullari authored and sijie committed Nov 16, 2018
1 parent 2c8c288 commit aefbaac
Show file tree
Hide file tree
Showing 6 changed files with 378 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.flink.batch.connectors.pulsar;

import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Preconditions;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.function.Function;

/**
* Flink Batch Sink to write DataSets into a Pulsar topic.
*/
public class PulsarOutputFormat<T> extends RichOutputFormat<T> {

private static final Logger LOG = LoggerFactory.getLogger(PulsarOutputFormat.class);

private static String serviceUrl;
private static String topicName;
private SerializationSchema<T> serializationSchema;

private transient Function<Throwable, MessageId> failureCallback;

private static volatile Producer<byte[]> producer;

public PulsarOutputFormat(String serviceUrl, String topicName, SerializationSchema<T> serializationSchema) {
Preconditions.checkNotNull(serviceUrl, "serviceUrl must not be null.");
Preconditions.checkArgument(StringUtils.isNotBlank(topicName), "topicName must not be blank.");
Preconditions.checkNotNull(serializationSchema, "serializationSchema must not be null.");

this.serviceUrl = serviceUrl;
this.topicName = topicName;
this.serializationSchema = serializationSchema;

LOG.info("PulsarOutputFormat is being started to write batches to Pulsar topic {}", this.topicName);
}

@Override
public void configure(Configuration configuration) {

}

@Override
public void open(int taskNumber, int numTasks) throws IOException {
this.producer = getProducerInstance();

this.failureCallback = cause -> {
LOG.error("Error while sending record to Pulsar : " + cause.getMessage(), cause);
return null;
};
}

@Override
public void writeRecord(T t) throws IOException {
byte[] data = this.serializationSchema.serialize(t);
this.producer.sendAsync(data)
.exceptionally(this.failureCallback);
}

@Override
public void close() throws IOException {

}

private static Producer<byte[]> getProducerInstance() throws PulsarClientException {
if(producer == null){
synchronized (PulsarOutputFormat.class) {
if(producer == null){
producer = Preconditions.checkNotNull(createPulsarProducer(),
"Pulsar producer must not be null.");
}
}
}
return producer;
}

private static Producer<byte[]> createPulsarProducer() throws PulsarClientException {
try {
PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
return client.newProducer().topic(topicName).create();
} catch (PulsarClientException e) {
LOG.error("Pulsar producer can not be created.", e);
throw e;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ protected void checkErroneous() throws Exception {
if (e != null) {
// prevent double throwing
asyncException = null;
throw new Exception("Failed to send data to Kafka: " + e.getMessage(), e);
throw new Exception("Failed to send data to Pulsar: " + e.getMessage(), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.flink.batch.connectors.pulsar;

import org.junit.Test;

import static org.junit.Assert.assertNotNull;

/**
* Tests for PulsarOutputFormat
*/
public class PulsarOutputFormatTest {

@Test(expected = NullPointerException.class)
public void testPulsarOutputFormatConstructorWhenServiceUrlIsNull() {
new PulsarOutputFormat(null, "testTopic", text -> text.toString().getBytes());
}

@Test(expected = IllegalArgumentException.class)
public void testPulsarOutputFormatConstructorWhenTopicNameIsNull() {
new PulsarOutputFormat("testServiceUrl", null, text -> text.toString().getBytes());
}

@Test(expected = IllegalArgumentException.class)
public void testPulsarOutputFormatConstructorWhenTopicNameIsBlank() {
new PulsarOutputFormat("testServiceUrl", " ", text -> text.toString().getBytes());
}

@Test(expected = NullPointerException.class)
public void testPulsarOutputFormatConstructorWhenSerializationSchemaIsNull() {
new PulsarOutputFormat("testServiceUrl", "testTopic", null);
}

@Test
public void testPulsarOutputFormatConstructor() {
PulsarOutputFormat pulsarOutputFormat =
new PulsarOutputFormat("testServiceUrl", "testTopic", text -> text.toString().getBytes());
assertNotNull(pulsarOutputFormat);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.flink.batch.connectors.pulsar.example;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.batch.connectors.pulsar.PulsarOutputFormat;
import org.apache.flink.util.Collector;

/**
* Implements a batch word-count program on Pulsar topic by writing Flink DataSet.
*/
public class FlinkPulsarBatchSinkExample {

private static final String EINSTEIN_QUOTE = "Imagination is more important than knowledge. " +
"Knowledge is limited. Imagination encircles the world.";

private static final String SERVICE_URL = "pulsar://127.0.0.1:6650";
private static final String TOPIC_NAME = "my-flink-topic";

public static void main(String[] args) throws Exception {

// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// create PulsarOutputFormat instance
final OutputFormat pulsarOutputFormat =
new PulsarOutputFormat(SERVICE_URL, TOPIC_NAME, wordWithCount -> wordWithCount.toString().getBytes());

// create DataSet
DataSet<String> textDS = env.fromElements(EINSTEIN_QUOTE);

textDS.flatMap(new FlatMapFunction<String, WordWithCount>() {
@Override
public void flatMap(String value, Collector<WordWithCount> out) throws Exception {
String[] words = value.toLowerCase().split(" ");
for(String word: words) {
out.collect(new WordWithCount(word.replace(".", ""), 1));
}
}
})
// filter words which length is bigger than 4
.filter(wordWithCount -> wordWithCount.word.length() > 4)
.groupBy(new KeySelector<WordWithCount, String>() {
@Override
public String getKey(WordWithCount wordWithCount) throws Exception {
return wordWithCount.word;
}
})
.reduce(new ReduceFunction<WordWithCount>() {
@Override
public WordWithCount reduce(WordWithCount wordWithCount1, WordWithCount wordWithCount2) throws Exception {
return new WordWithCount(wordWithCount1.word, wordWithCount1.count + wordWithCount2.count);
}
})
// write batch data to Pulsar
.output(pulsarOutputFormat);

// execute program
env.execute("Flink - Pulsar Batch WordCount");

}

/**
* Data type for words with count.
*/
private static class WordWithCount {

public String word;
public long count;

public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}

@Override
public String toString() {
return "WordWithCount { word = " + word + ", count = " + count + " }";
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
The Flink Batch Sink for Pulsar is a custom sink that enables Apache [Flink](https://flink.apache.org/) to write [DataSet](https://ci.apache.org/projects/flink/flink-docs-stable/dev/batch/index.html) to Pulsar.

## Prerequisites

To use this sink, include a dependency for the `pulsar-flink` library in your Java configuration.

### Maven

If you're using Maven, add this to your `pom.xml`:

```xml
<!-- in your <properties> block -->
<pulsar.version>{{pulsar:version}}</pulsar.version>

<!-- in your <dependencies> block -->
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-flink</artifactId>
<version>${pulsar.version}</version>
</dependency>
```

### Gradle

If you're using Gradle, add this to your `build.gradle` file:

```groovy
def pulsarVersion = "{{pulsar:version}}"
dependencies {
compile group: 'org.apache.pulsar', name: 'pulsar-flink', version: pulsarVersion
}
```

## Usage

Please find a sample usage as follows:

```java
private static final String EINSTEIN_QUOTE = "Imagination is more important than knowledge. " +
"Knowledge is limited. Imagination encircles the world.";

private static final String SERVICE_URL = "pulsar://127.0.0.1:6650";
private static final String TOPIC_NAME = "my-flink-topic";

public static void main(String[] args) throws Exception {

// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// create PulsarOutputFormat instance
final OutputFormat<String> pulsarOutputFormat =
new PulsarOutputFormat(SERVICE_URL, TOPIC_NAME, wordWithCount -> wordWithCount.toString().getBytes());

// create DataSet
DataSet<String> textDS = env.fromElements(EINSTEIN_QUOTE);

textDS.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
String[] words = value.toLowerCase().split(" ");
for(String word: words) {
out.collect(word.replace(".", ""));
}
}
})
// filter words which length is bigger than 4
.filter(word -> word.length() > 4)

// write batch data to Pulsar
.output(pulsarOutputFormat);

// execute program
env.execute("Flink - Pulsar Batch WordCount");
}
```

## Sample Output

Please find sample output for above application as follows:
```
imagination
important
knowledge
knowledge
limited
imagination
encircles
world
```

## Complete Example

You can find a complete example [here](https://github.com/apache/incubator-pulsar/tree/master/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java).
In this example, Flink DataSet is processed as word-count and being written to Pulsar.

## Complete Example Output
Please find sample output for above linked application as follows:
```
WordWithCount { word = important, count = 1 }
WordWithCount { word = encircles, count = 1 }
WordWithCount { word = imagination, count = 2 }
WordWithCount { word = knowledge, count = 2 }
WordWithCount { word = limited, count = 1 }
WordWithCount { word = world, count = 1 }
```
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
* and publish to a Pulsar topic. The reason its called Push is
* because PushSources get passed a consumer that they
* invoke whenever they have data to be published to Pulsar.
* The lifcycle of a PushSource is to open it passing any config needed
* The lifecycle of a PushSource is to open it passing any config needed
* by it to initialize(like open network connection, authenticate, etc).
* A consumer is then to it which is invoked by the source whenever
* there is data to be published. Once all data has been read, one can use close
Expand Down

0 comments on commit aefbaac

Please sign in to comment.