Skip to content

Commit

Permalink
[pulsar-flink] add streaming connectors as a Pulsar stream that seria…
Browse files Browse the repository at this point in the history
…lizes data in Avro format (apache#3231)

### Motivation

add Avro data format flink streaming connectors 


### Modifications

add class PulsarAvroTableSink 

### Result

add append-only table sink for serializes data in Avro format.
  • Loading branch information
ambition119 authored and sijie committed Dec 24, 2018
1 parent a43ae4d commit 7a1a148
Show file tree
Hide file tree
Showing 13 changed files with 661 additions and 7 deletions.
4 changes: 2 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -80,5 +80,5 @@ docker.debug-info
**/website/translated_docs*

# Avro
examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/avro/generated
pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/avro/generated
examples/flink-consumer-source/src/main/java/org/apache/flink/avro/generated
pulsar-flink/src/test/java/org/apache/flink/avro/generated
12 changes: 12 additions & 0 deletions examples/flink-consumer-source/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,18 @@
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-flink</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.avro.generated.NasaMission;
import org.apache.flink.batch.connectors.pulsar.PulsarAvroOutputFormat;
import org.apache.flink.batch.connectors.pulsar.avro.generated.NasaMission;

import java.util.Arrays;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.flink.streaming.connectors.pulsar.example;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.avro.generated.WordWithCount;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.pulsar.PulsarAvroTableSink;
import org.apache.flink.streaming.connectors.pulsar.PulsarSourceBuilder;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.sinks.CsvTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.pulsar.client.api.ProducerConfiguration;

/**
* Implements a streaming wordcount program on pulsar topics.
*
* <p>Example usage:
* --service-url pulsar://localhost:6650 --input-topic test_topic --subscription test_sub
*/
public class PulsarConsumerSourceWordCountToAvroTableSink {
private static final String SERVICE_URL = "pulsar://localhost:6650";
private static final String ROUTING_KEY = "word";

public static void main(String[] args) throws Exception {
// parse input arguments
final ParameterTool parameterTool = ParameterTool.fromArgs(args);

if (parameterTool.getNumberOfParameters() < 2) {
System.out.println("Missing parameters!");
System.out.println("Usage: pulsar --service-url <pulsar-service-url> --input-topic <topic> --subscription <sub> --output-topic <topic>");
return;
}

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableSysoutLogging();
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
env.enableCheckpointing(5000);
env.getConfig().setGlobalJobParameters(parameterTool);
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

StreamTableEnvironment tableEnvironment = StreamTableEnvironment.getTableEnvironment(env);

String serviceUrl = parameterTool.getRequired("service-url");
String inputTopic = parameterTool.getRequired("input-topic");
String subscription = parameterTool.get("subscription", "flink-examples");
String outputTopic = parameterTool.get("output-topic", null);
int parallelism = parameterTool.getInt("parallelism", 1);

System.out.println("Parameters:");
System.out.println("\tServiceUrl:\t" + serviceUrl);
System.out.println("\tInputTopic:\t" + inputTopic);
System.out.println("\tSubscription:\t" + subscription);
System.out.println("\tOutputTopic:\t" + outputTopic);
System.out.println("\tParallelism:\t" + parallelism);

PulsarSourceBuilder<String> builder = PulsarSourceBuilder.builder(new SimpleStringSchema())
.serviceUrl(serviceUrl)
.topic(inputTopic)
.subscriptionName(subscription);
SourceFunction<String> src = builder.build();
DataStream<String> input = env.addSource(src);


DataStream<WordWithCount> wc = input
.flatMap((FlatMapFunction<String, WordWithCount>) (line, collector) -> {
for (String word : line.split("\\s")) {
collector.collect(
WordWithCount.newBuilder().setWord(word).setCount(1).build()
);
}
})
.returns(WordWithCount.class)
.keyBy("word")
.timeWindow(Time.seconds(5))
.reduce((ReduceFunction<WordWithCount>) (c1, c2) ->
WordWithCount.newBuilder().setWord(c1.getWord()).setCount(c1.getCount() + c2.getCount()).build()
);

tableEnvironment.registerDataStream("wc",wc);

Table table = tableEnvironment.sqlQuery("select * from wc");
if (null != outputTopic) {
PulsarAvroTableSink sink = new PulsarAvroTableSink(SERVICE_URL, outputTopic, new ProducerConfiguration(), ROUTING_KEY,WordWithCount.class);
table.writeToSink(sink);
} else {
TableSink sink = new CsvTableSink("./examples/file", "|");
// print the results with a csv file
table.writeToSink(sink);
}

env.execute("Pulsar Stream WordCount");
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.flink.streaming.connectors.pulsar.example;

import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import lombok.ToString;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.pulsar.PulsarJsonTableSink;
import org.apache.flink.streaming.connectors.pulsar.PulsarSourceBuilder;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.sinks.CsvTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.pulsar.client.api.ProducerConfiguration;

/**
* Implements a streaming wordcount program on pulsar topics.
*
* <p>Example usage:
* --service-url pulsar://localhost:6650 --input-topic test_topic --subscription test_sub
*/
public class PulsarConsumerSourceWordCountToJsonTableSink {
private static final String SERVICE_URL = "pulsar://localhost:6650";
private static final String ROUTING_KEY = "word";

public static void main(String[] args) throws Exception {
// parse input arguments
final ParameterTool parameterTool = ParameterTool.fromArgs(args);

if (parameterTool.getNumberOfParameters() < 2) {
System.out.println("Missing parameters!");
System.out.println("Usage: pulsar --service-url <pulsar-service-url> --input-topic <topic> --subscription <sub> --output-topic <topic>");
return;
}

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableSysoutLogging();
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
env.enableCheckpointing(5000);
env.getConfig().setGlobalJobParameters(parameterTool);
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

StreamTableEnvironment tableEnvironment = StreamTableEnvironment.getTableEnvironment(env);

String serviceUrl = parameterTool.getRequired("service-url");
String inputTopic = parameterTool.getRequired("input-topic");
String subscription = parameterTool.get("subscription", "flink-examples");
String outputTopic = parameterTool.get("output-topic", null);
int parallelism = parameterTool.getInt("parallelism", 1);

System.out.println("Parameters:");
System.out.println("\tServiceUrl:\t" + serviceUrl);
System.out.println("\tInputTopic:\t" + inputTopic);
System.out.println("\tSubscription:\t" + subscription);
System.out.println("\tOutputTopic:\t" + outputTopic);
System.out.println("\tParallelism:\t" + parallelism);

PulsarSourceBuilder<String> builder = PulsarSourceBuilder.builder(new SimpleStringSchema())
.serviceUrl(serviceUrl)
.topic(inputTopic)
.subscriptionName(subscription);
SourceFunction<String> src = builder.build();
DataStream<String> input = env.addSource(src);


DataStream<WordWithCount> wc = input
.flatMap((FlatMapFunction<String, WordWithCount>) (line, collector) -> {
for (String word : line.split("\\s")) {
collector.collect(
new WordWithCount(word, 1)
);
}
})
.returns(WordWithCount.class)
.keyBy("word")
.timeWindow(Time.seconds(5))
.reduce((ReduceFunction<WordWithCount>) (c1, c2) ->
new WordWithCount(c1.word, c1.count + c2.count));

tableEnvironment.registerDataStream("wc",wc);

Table table = tableEnvironment.sqlQuery("select * from wc");
if (null != outputTopic) {
PulsarJsonTableSink sink = new PulsarJsonTableSink(SERVICE_URL, outputTopic, new ProducerConfiguration(), ROUTING_KEY);
table.writeToSink(sink);
} else {
TableSink sink = new CsvTableSink("./examples/file", "|");
// print the results with a csv file
table.writeToSink(sink);
}

env.execute("Pulsar Stream WordCount");
}

/**
* Data type for words with count.
*/
@AllArgsConstructor
@NoArgsConstructor
@ToString
public static class WordWithCount {

public String word;
public long count;

}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{"namespace": "org.apache.flink.batch.connectors.pulsar.avro.generated",
[
{"namespace": "org.apache.flink.avro.generated",
"type": "record",
"name": "NasaMission",
"fields": [
Expand All @@ -7,4 +8,13 @@
{"name": "start_year", "type": ["int", "null"]},
{"name": "end_year", "type": ["int", "null"]}
]
},
{"namespace": "org.apache.flink.avro.generated",
"type": "record",
"name": "WordWithCount",
"fields": [
{"name": "word", "type": "string"},
{"name": "count", "type": "long"}
]
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
package org.apache.flink.batch.connectors.pulsar.example

import org.apache.flink.api.scala._
import org.apache.flink.avro.generated.NasaMission
import org.apache.flink.batch.connectors.pulsar.PulsarAvroOutputFormat
import org.apache.flink.batch.connectors.pulsar.avro.generated.NasaMission

/**
* Implements a batch Scala program on Pulsar topic by writing Flink DataSet as Avro.
Expand Down
4 changes: 4 additions & 0 deletions pulsar-flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@
<directory>src/main/resources</directory>
<filtering>true</filtering>
</resource>
<resource>
<directory>src/test/resources</directory>
<filtering>true</filtering>
</resource>
</resources>

<plugins>
Expand Down
Loading

0 comments on commit 7a1a148

Please sign in to comment.