Skip to content

Commit

Permalink
[pulsar-flink] Implements a batch program on Pulsar topic by writing …
Browse files Browse the repository at this point in the history
…Flink DataSet as Avro (apache#3205)

Implements a batch program on Pulsar topic by writing Flink DataSet as Avro data Type
  • Loading branch information
ambition119 authored and sijie committed Dec 18, 2018
1 parent 1414edd commit 5493e2f
Show file tree
Hide file tree
Showing 13 changed files with 577 additions and 6 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,6 @@ docker.debug-info

**/website/i18n/*
**/website/translated_docs*

examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/avro/generated
pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/avro/generated
25 changes: 25 additions & 0 deletions examples/flink-consumer-source/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@
</dependencies>

<build>
<resources>
<resource>
<directory>src/main/resources</directory>
<filtering>true</filtering>
</resource>
</resources>

<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand Down Expand Up @@ -86,6 +93,24 @@
</execution>
</executions>
</plugin>
<!-- Generate Test class from avro schema -->
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>${avro.version}</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<testSourceDirectory>${project.basedir}/src/main/resources/avro</testSourceDirectory>
<testOutputDirectory>${project.basedir}/src/main/java/</testOutputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.flink.batch.connectors.pulsar.example;


import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.batch.connectors.pulsar.PulsarAvroOutputFormat;
import org.apache.flink.batch.connectors.pulsar.avro.generated.NasaMission;

import java.util.Arrays;
import java.util.List;

/**
* Implements a batch program on Pulsar topic by writing Flink DataSet as Avro.
*/
public class FlinkPulsarBatchAvroSinkExample {

private static final List<NasaMission> nasaMissions = Arrays.asList(
NasaMission.newBuilder().setId(1).setName("Mercury program").setStartYear(1959).setEndYear(1963).build(),
NasaMission.newBuilder().setId(2).setName("Apollo program").setStartYear(1961).setEndYear(1972).build(),
NasaMission.newBuilder().setId(3).setName("Gemini program").setStartYear(1963).setEndYear(1966).build(),
NasaMission.newBuilder().setId(4).setName("Skylab").setStartYear(1973).setEndYear(1974).build(),
NasaMission.newBuilder().setId(5).setName("Apollo–Soyuz Test Project").setStartYear(1975).setEndYear(1975).build());

private static final String SERVICE_URL = "pulsar://127.0.0.1:6650";
private static final String TOPIC_NAME = "my-flink-topic";

public static void main(String[] args) throws Exception {

// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// create PulsarAvroOutputFormat instance
final OutputFormat<NasaMission> pulsarAvroOutputFormat = new PulsarAvroOutputFormat<>(SERVICE_URL, TOPIC_NAME);

// create DataSet
DataSet<NasaMission> nasaMissionDS = env.fromCollection(nasaMissions);
// map nasa mission names to upper-case
nasaMissionDS.map(nasaMission -> new NasaMission(
nasaMission.getId(),
nasaMission.getName(),
nasaMission.getStartYear(),
nasaMission.getEndYear()))
// filter missions which started after 1970
.filter(nasaMission -> nasaMission.getStartYear() > 1970)
// write batch data to Pulsar
.output(pulsarAvroOutputFormat);

// set parallelism to write Pulsar in parallel (optional)
env.setParallelism(2);

// execute program
env.execute("Flink - Pulsar Batch Avro");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ world

### Complete Example

You can find a complete example [here](https://github.com/apache/incubator-pulsar/tree/master/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java).
You can find a complete example [here](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java).
In this example, Flink DataSet is processed as word-count and being written to Pulsar.

### Complete Example Output
Expand Down Expand Up @@ -192,7 +192,7 @@ Please find sample output for above application as follows:

### Complete Example

You can find a complete example [here](https://github.com/apache/incubator-pulsar/tree/master/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java).
You can find a complete example [here](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkExample.java).
In this example, Flink DataSet is processed and written to Pulsar in Csv format.


Expand Down Expand Up @@ -290,5 +290,67 @@ Please find sample output for above application as follows:

### Complete Example

You can find a complete example [here](https://github.com/apache/incubator-pulsar/tree/master/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java).
You can find a complete example [here](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkExample.java).
In this example, Flink DataSet is processed and written to Pulsar in Json format.


# PulsarAvroOutputFormat
### Usage

Please find a sample usage as follows:

```java
private static final List<NasaMission> nasaMissions = Arrays.asList(
NasaMission.newBuilder().setId(1).setName("Mercury program").setStartYear(1959).setEndYear(1963).build(),
NasaMission.newBuilder().setId(2).setName("Apollo program").setStartYear(1961).setEndYear(1972).build(),
NasaMission.newBuilder().setId(3).setName("Gemini program").setStartYear(1963).setEndYear(1966).build(),
NasaMission.newBuilder().setId(4).setName("Skylab").setStartYear(1973).setEndYear(1974).build(),
NasaMission.newBuilder().setId(5).setName("Apollo–Soyuz Test Project").setStartYear(1975).setEndYear(1975).build());

private static final String SERVICE_URL = "pulsar://127.0.0.1:6650";
private static final String TOPIC_NAME = "my-flink-topic";

public static void main(String[] args) throws Exception {

// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// create PulsarAvroOutputFormat instance
final OutputFormat<NasaMission> pulsarAvroOutputFormat = new PulsarAvroOutputFormat<>(SERVICE_URL, TOPIC_NAME);

// create DataSet
DataSet<NasaMission> nasaMissionDS = env.fromCollection(nasaMissions);
// map nasa mission names to upper-case
nasaMissionDS.map(nasaMission -> new NasaMission(
nasaMission.getId(),
nasaMission.getName(),
nasaMission.getStartYear(),
nasaMission.getEndYear()))
// filter missions which started after 1970
.filter(nasaMission -> nasaMission.getStartYear() > 1970)
// write batch data to Pulsar
.output(pulsarAvroOutputFormat);

// set parallelism to write Pulsar in parallel (optional)
env.setParallelism(2);

// execute program
env.execute("Flink - Pulsar Batch Avro");
}

```

**Note:** NasaMission class are automatically generated by Avro

### Sample Output

Please find sample output for above application as follows:
```
"4,SKYLAB,1973,1974"
"5,APOLLO–SOYUZ TEST PROJECT,1975,1975"
```

### Complete Example

You can find a complete example [here](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java).
In this example, Flink DataSet is processed and written to Pulsar in Avro format.
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{"namespace": "org.apache.flink.batch.connectors.pulsar.avro.generated",
"type": "record",
"name": "NasaMission",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "start_year", "type": ["int", "null"]},
{"name": "end_year", "type": ["int", "null"]}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.flink.batch.connectors.pulsar.example

import org.apache.flink.api.scala._
import org.apache.flink.batch.connectors.pulsar.PulsarAvroOutputFormat
import org.apache.flink.batch.connectors.pulsar.avro.generated.NasaMission

/**
* Implements a batch Scala program on Pulsar topic by writing Flink DataSet as Avro.
*/
object FlinkPulsarBatchAvroSinkScalaExample {

private val SERVICE_URL = "pulsar://127.0.0.1:6650"
private val TOPIC_NAME = "my-flink-topic"

val nasaMissions = List(
NasaMission.newBuilder.setId(1).setName("Mercury program").setStartYear(1959).setEndYear(1963).build,
NasaMission.newBuilder.setId(2).setName("Apollo program").setStartYear(1961).setEndYear(1972).build,
NasaMission.newBuilder.setId(3).setName("Gemini program").setStartYear(1963).setEndYear(1966).build,
NasaMission.newBuilder.setId(4).setName("Skylab").setStartYear(1973).setEndYear(1974).build,
NasaMission.newBuilder.setId(5).setName("Apollo–Soyuz Test Project").setStartYear(1975).setEndYear(1975).build)

def main(args: Array[String]): Unit = {

// set up the execution environment
val env = ExecutionEnvironment.getExecutionEnvironment

// create PulsarCsvOutputFormat instance
val pulsarAvroOutputFormat =
new PulsarAvroOutputFormat[NasaMission](SERVICE_URL, TOPIC_NAME)

// create DataSet
val textDS = env.fromCollection(nasaMissions)

// map nasa mission names to upper-case
textDS.map(nasaMission => new NasaMission(
nasaMission.getId,
nasaMission.getName,
nasaMission.getStartYear,
nasaMission.getEndYear))

// filter missions which started after 1970
.filter(_.getStartYear > 1970)

// write batch data to Pulsar as Avro
.output(pulsarAvroOutputFormat)

// set parallelism to write Pulsar in parallel (optional)
env.setParallelism(2)

// execute program
env.execute("Flink - Pulsar Batch Avro")
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ WordWithCount { word = world, count = 1 }

### Complete Example

You can find a complete example [here](https://github.com/apache/incubator-pulsar/tree/master/pulsar-flink/src/test/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala).
You can find a complete example [here](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkScalaExample.scala).
In this example, Flink DataSet is processed as word-count and being written to Pulsar.


Expand Down Expand Up @@ -203,7 +203,7 @@ Please find sample output for above application as follows:

### Complete Example

You can find a complete example [here](https://github.com/apache/incubator-pulsar/tree/master/pulsar-flink/src/test/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala).
You can find a complete example [here](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchCsvSinkScalaExample.scala).
In this example, Flink DataSet is processed and written to Pulsar in Csv format.


Expand Down Expand Up @@ -279,5 +279,67 @@ Please find sample output for above application as follows:

### Complete Example

You can find a complete example [here](https://github.com/apache/incubator-pulsar/tree/master/pulsar-flink/src/test/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala).
You can find a complete example [here](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchJsonSinkScalaExample.scala).
In this example, Flink DataSet is processed and written to Pulsar in Json format.


# PulsarAvroOutputFormat
### Usage

Please find Scala sample usage of `PulsarAvroOutputFormat` as follows:

```scala
val nasaMissions = List(
NasaMission.newBuilder.setId(1).setName("Mercury program").setStartYear(1959).setEndYear(1963).build,
NasaMission.newBuilder.setId(2).setName("Apollo program").setStartYear(1961).setEndYear(1972).build,
NasaMission.newBuilder.setId(3).setName("Gemini program").setStartYear(1963).setEndYear(1966).build,
NasaMission.newBuilder.setId(4).setName("Skylab").setStartYear(1973).setEndYear(1974).build,
NasaMission.newBuilder.setId(5).setName("Apollo–Soyuz Test Project").setStartYear(1975).setEndYear(1975).build)

def main(args: Array[String]): Unit = {

// set up the execution environment
val env = ExecutionEnvironment.getExecutionEnvironment

// create PulsarCsvOutputFormat instance
val pulsarAvroOutputFormat =
new PulsarAvroOutputFormat[NasaMission](SERVICE_URL, TOPIC_NAME)

// create DataSet
val textDS = env.fromCollection(nasaMissions)

// map nasa mission names to upper-case
textDS.map(nasaMission => new NasaMission(
nasaMission.getId,
nasaMission.getName,
nasaMission.getStartYear,
nasaMission.getEndYear))

// filter missions which started after 1970
.filter(_.getStartYear > 1970)

// write batch data to Pulsar as Avro
.output(pulsarAvroOutputFormat)

// set parallelism to write Pulsar in parallel (optional)
env.setParallelism(2)

// execute program
env.execute("Flink - Pulsar Batch Avro")
}
```

**Note:** NasaMission class are automatically generated by Avro

### Sample Output

Please find sample output for above application as follows:
```
"4,SKYLAB,1973,1974"
"5,APOLLO–SOYUZ TEST PROJECT,1975,1975"
```

### Complete Example

You can find a complete example [here](https://github.com/apache/pulsar/tree/master/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkScalaExample.scala).
In this example, Flink DataSet is processed and written to Pulsar in Avro format.
Loading

0 comments on commit 5493e2f

Please sign in to comment.