diff --git a/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java b/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java
index 584d59fec1bd2..1349dba4cc5dc 100644
--- a/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java
+++ b/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java
@@ -84,5 +84,5 @@ public static void main(String[] args) throws Exception {
// execute program
env.execute("Flink - Pulsar Batch Avro");
}
-
+
}
diff --git a/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/README.md b/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/README.md
index 84c1bd8a52daf..29adc344e391a 100644
--- a/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/README.md
+++ b/examples/flink/src/main/java/org/apache/flink/batch/connectors/pulsar/example/README.md
@@ -211,6 +211,10 @@ $ bin/pulsar-client consume -n 0 -s test test_flink_topic
6. Please find sample output for above linked application as follows:
```
- "4,SKYLAB,1973,1974"
- "5,APOLLO–SOYUZ TEST PROJECT,1975,1975"
-```
+ ----- got message -----
+
+ Skylab��
+ ----- got message -----
+
+ 6Apollo–Soyuz Test Project��
+```
\ No newline at end of file
diff --git a/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToAvroTableSink.java b/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToAvroTableSink.java
index 7b78da5149fe2..4563b5669679a 100644
--- a/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToAvroTableSink.java
+++ b/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToAvroTableSink.java
@@ -42,16 +42,17 @@
*
*
Example usage:
* --service-url pulsar://localhost:6650 --input-topic test_topic --subscription test_sub
+ * or
+ * --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub --output-topic test_sub
*/
public class PulsarConsumerSourceWordCountToAvroTableSink {
- private static final String SERVICE_URL = "pulsar://localhost:6650";
private static final String ROUTING_KEY = "word";
public static void main(String[] args) throws Exception {
// parse input arguments
final ParameterTool parameterTool = ParameterTool.fromArgs(args);
- if (parameterTool.getNumberOfParameters() < 2) {
+ if (parameterTool.getNumberOfParameters() < 3) {
System.out.println("Missing parameters!");
System.out.println("Usage: pulsar --service-url --input-topic --subscription --output-topic ");
return;
@@ -96,23 +97,23 @@ public static void main(String[] args) throws Exception {
}
})
.returns(WordWithCount.class)
- .keyBy("word")
+ .keyBy(ROUTING_KEY)
.timeWindow(Time.seconds(5))
.reduce((ReduceFunction) (c1, c2) ->
WordWithCount.newBuilder().setWord(c1.getWord()).setCount(c1.getCount() + c2.getCount()).build()
);
tableEnvironment.registerDataStream("wc",wc);
-
- Table table = tableEnvironment.sqlQuery("select * from wc");
+ Table table = tableEnvironment.sqlQuery("select word, `count` from wc");
+ table.printSchema();
+ TableSink sink = null;
if (null != outputTopic) {
- PulsarAvroTableSink sink = new PulsarAvroTableSink(SERVICE_URL, outputTopic, new ProducerConfiguration(), ROUTING_KEY,WordWithCount.class);
- table.writeToSink(sink);
+ sink = new PulsarAvroTableSink(serviceUrl, outputTopic, new ProducerConfiguration(), ROUTING_KEY, WordWithCount.class);
} else {
- TableSink sink = new CsvTableSink("./examples/file", "|");
// print the results with a csv file
- table.writeToSink(sink);
+ sink = new CsvTableSink("./examples/file", "|");
}
+ table.writeToSink(sink);
env.execute("Pulsar Stream WordCount");
}
diff --git a/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToJsonTableSink.java b/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToJsonTableSink.java
index 95b253675a66d..de09146e5ab48 100644
--- a/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToJsonTableSink.java
+++ b/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToJsonTableSink.java
@@ -44,16 +44,17 @@
*
* Example usage:
* --service-url pulsar://localhost:6650 --input-topic test_topic --subscription test_sub
+ * or
+ * --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub --output-topic test_sub
*/
public class PulsarConsumerSourceWordCountToJsonTableSink {
- private static final String SERVICE_URL = "pulsar://localhost:6650";
private static final String ROUTING_KEY = "word";
public static void main(String[] args) throws Exception {
// parse input arguments
final ParameterTool parameterTool = ParameterTool.fromArgs(args);
- if (parameterTool.getNumberOfParameters() < 2) {
+ if (parameterTool.getNumberOfParameters() < 3) {
System.out.println("Missing parameters!");
System.out.println("Usage: pulsar --service-url --input-topic --subscription --output-topic ");
return;
@@ -98,22 +99,22 @@ public static void main(String[] args) throws Exception {
}
})
.returns(WordWithCount.class)
- .keyBy("word")
+ .keyBy(ROUTING_KEY)
.timeWindow(Time.seconds(5))
.reduce((ReduceFunction) (c1, c2) ->
new WordWithCount(c1.word, c1.count + c2.count));
tableEnvironment.registerDataStream("wc",wc);
-
- Table table = tableEnvironment.sqlQuery("select * from wc");
+ Table table = tableEnvironment.sqlQuery("select word, `count` from wc");
+ table.printSchema();
+ TableSink sink = null;
if (null != outputTopic) {
- PulsarJsonTableSink sink = new PulsarJsonTableSink(SERVICE_URL, outputTopic, new ProducerConfiguration(), ROUTING_KEY);
- table.writeToSink(sink);
+ sink = new PulsarJsonTableSink(serviceUrl, outputTopic, new ProducerConfiguration(), ROUTING_KEY);
} else {
- TableSink sink = new CsvTableSink("./examples/file", "|");
// print the results with a csv file
- table.writeToSink(sink);
+ sink = new CsvTableSink("./examples/file", "|");
}
+ table.writeToSink(sink);
env.execute("Pulsar Stream WordCount");
}
diff --git a/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/README.md b/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/README.md
index 7c8c17687e05c..ac36eb5dbc1ef 100644
--- a/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/README.md
+++ b/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/README.md
@@ -61,7 +61,7 @@ The steps to run the example:
4. Run the word count example to print results to stdout.
```shell
- $ ./bin/flink run ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub
+ $ ./bin/flink run -c org.apache.flink.streaming.connectors.pulsar.example.PulsarConsumerSourceWordCount ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub
```
5. Produce messages to topic `test_src`.
@@ -73,19 +73,16 @@ The steps to run the example:
6. You can check the flink taskexecutor `.out` file. The `.out` file will print the counts at the end of each time window as long as words are floating in, e.g.:
```shell
- PulsarConsumerSourceWordCount.WordWithCount(word=hello, count=200)
- PulsarConsumerSourceWordCount.WordWithCount(word=again, count=200)
- PulsarConsumerSourceWordCount.WordWithCount(word=test, count=200)
- PulsarConsumerSourceWordCount.WordWithCount(word=world, count=200)
PulsarConsumerSourceWordCount.WordWithCount(word=hello, count=100)
PulsarConsumerSourceWordCount.WordWithCount(word=again, count=100)
PulsarConsumerSourceWordCount.WordWithCount(word=test, count=100)
+ PulsarConsumerSourceWordCount.WordWithCount(word=world, count=100)
```
Alternatively, when you run the flink word count example at step 4, you can choose dump the result to another pulsar topic.
```shell
-$ ./bin/flink run ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub --output-topic test_dest
+$ ./bin/flink run -c org.apache.flink.streaming.connectors.pulsar.example.PulsarConsumerSourceWordCount ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub --output-topic test_dest
```
Once the flink word count example is running, you can use `bin/pulsar-client` to tail the results produced into topic `test_dest`.
@@ -95,3 +92,118 @@ $ bin/pulsar-client consume -n 0 -s test test_dest
```
You will see similar results as what you see at step 6 when running the word count example to print results to stdout.
+
+
+### PulsarConsumerSourceWordCountToAvroTableSink
+
+This Flink streaming job is consuming from a Pulsar topic and counting the wordcount in a streaming fashion. The job can write the word count results
+to csv file or another Pulsar topic for avro format.
+
+The steps to run the example:
+
+Step 1, 2 and 3 are same as above.
+
+4. Run the word count example to print results to stdout.
+
+ ```shell
+ $ ./bin/flink run -c org.apache.flink.streaming.connectors.pulsar.example.PulsarConsumerSourceWordCountToAvroTableSink ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub
+ ```
+
+5. Produce messages to topic `test_src`.
+
+ ```shell
+ $ bin/pulsar-client produce -m "hello world again" -n 100 test_src
+ ```
+
+6. You can check the ${FLINK_HOME}/examples/file. The file contains the counts at the end of each time window as long as words are floating in, e.g.:
+
+ ```file
+ hello|100
+ again|100
+ test|100
+ world|100
+ ```
+
+Alternatively, when you run the flink word count example at step 4, you can choose dump the result to another pulsar topic.
+
+```shell
+$ ./bin/flink run -c org.apache.flink.streaming.connectors.pulsar.example.PulsarConsumerSourceWordCountToAvroTableSink ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub --output-topic test_dest
+```
+
+Once the flink word count example is running, you can use `bin/pulsar-client` to tail the results produced into topic `test_dest`.
+
+```shell
+$ bin/pulsar-client consume -n 0 -s test test_dest
+```
+
+You will see sample output for above linked application as follows:.
+```
+----- got message -----
+
+hello�
+----- got message -----
+
+again�
+----- got message -----
+test�
+----- got message -----
+
+world�
+
+```
+
+### PulsarConsumerSourceWordCountToJsonTableSink
+
+This Flink streaming job is consuming from a Pulsar topic and counting the wordcount in a streaming fashion. The job can write the word count results
+to csv file or another Pulsar topic for json format.
+
+The steps to run the example:
+
+Step 1, 2 and 3 are same as above.
+
+4. Run the word count example to print results to stdout.
+
+ ```shell
+ $ ./bin/flink run -c org.apache.flink.streaming.connectors.pulsar.example.PulsarConsumerSourceWordCountToJsonTableSink ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub
+ ```
+
+If java.lang.ClassNotFoundException: org.apache.flink.table.sinks.TableSink and java.lang.NoClassDefFoundError: org/apache/flink/formats/json/JsonRowSerializationSchema, you need build Apache Flink from source, then copy flink-table_{version}.jar, flink-json_{version}.jar to ${FLINK_HOME}/lib and restart flink cluster.
+
+5. Produce messages to topic `test_src`.
+
+ ```shell
+ $ bin/pulsar-client produce -m "hello world again" -n 100 test_src
+ ```
+
+6. You can check the ${FLINK_HOME}/examples/file. The file contains the counts at the end of each time window as long as words are floating in, e.g.:
+
+ ```file
+ hello|100
+ again|100
+ test|100
+ world|100
+ ```
+
+Alternatively, when you run the flink word count example at step 4, you can choose dump the result to another pulsar topic.
+
+```shell
+$ ./bin/flink run -c org.apache.flink.streaming.connectors.pulsar.example.PulsarConsumerSourceWordCountToJsonTableSink ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub --output-topic test_dest
+```
+
+Once the flink word count example is running, you can use `bin/pulsar-client` to tail the results produced into topic `test_dest`.
+
+```shell
+$ bin/pulsar-client consume -n 0 -s test test_dest
+```
+
+You will see sample output for above linked application as follows:.
+```
+----- got message -----
+{"word":"hello","count":100}
+----- got message -----
+{"word":"again","count":100}
+----- got message -----
+{"word":"test","count":100}
+----- got message -----
+{"word":"world","count":100}
+```
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java
index 9187a0d71c22c..a7a44121c9fe1 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java
@@ -99,7 +99,8 @@ public void emitDataStream(DataStream dataStream) {
@Override
public TypeInformation getOutputType() {
- return new RowTypeInfo(fieldTypes, fieldNames);
+ RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes, fieldNames);
+ return rowTypeInfo;
}
@Override
@@ -162,7 +163,7 @@ public AvroKeyExtractor(
@Override
public String getKey(Row event) {
- return (String) event.getField(keyIndex);
+ return event.getField(keyIndex).toString();
}
}