Skip to content

Commit

Permalink
[flink-consumer-source] fix flink streaming connector examples to be…
Browse files Browse the repository at this point in the history
… consistent with batch examples (apache#3265)

fix flink streaming connector examples to be consistent with batch examples
  • Loading branch information
ambition119 authored and sijie committed Dec 31, 2018
1 parent f7af5c8 commit ab39aeb
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,5 +84,5 @@ public static void main(String[] args) throws Exception {
// execute program
env.execute("Flink - Pulsar Batch Avro");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,10 @@ $ bin/pulsar-client consume -n 0 -s test test_flink_topic

6. Please find sample output for above linked application as follows:
```
"4,SKYLAB,1973,1974"
"5,APOLLO–SOYUZ TEST PROJECT,1975,1975"
```
----- got message -----
Skylab��
----- got message -----
6Apollo–Soyuz Test Project��
```
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,17 @@
*
* <p>Example usage:
* --service-url pulsar://localhost:6650 --input-topic test_topic --subscription test_sub
* or
* --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub --output-topic test_sub
*/
public class PulsarConsumerSourceWordCountToAvroTableSink {
private static final String SERVICE_URL = "pulsar://localhost:6650";
private static final String ROUTING_KEY = "word";

public static void main(String[] args) throws Exception {
// parse input arguments
final ParameterTool parameterTool = ParameterTool.fromArgs(args);

if (parameterTool.getNumberOfParameters() < 2) {
if (parameterTool.getNumberOfParameters() < 3) {
System.out.println("Missing parameters!");
System.out.println("Usage: pulsar --service-url <pulsar-service-url> --input-topic <topic> --subscription <sub> --output-topic <topic>");
return;
Expand Down Expand Up @@ -96,23 +97,23 @@ public static void main(String[] args) throws Exception {
}
})
.returns(WordWithCount.class)
.keyBy("word")
.keyBy(ROUTING_KEY)
.timeWindow(Time.seconds(5))
.reduce((ReduceFunction<WordWithCount>) (c1, c2) ->
WordWithCount.newBuilder().setWord(c1.getWord()).setCount(c1.getCount() + c2.getCount()).build()
);

tableEnvironment.registerDataStream("wc",wc);

Table table = tableEnvironment.sqlQuery("select * from wc");
Table table = tableEnvironment.sqlQuery("select word, `count` from wc");
table.printSchema();
TableSink sink = null;
if (null != outputTopic) {
PulsarAvroTableSink sink = new PulsarAvroTableSink(SERVICE_URL, outputTopic, new ProducerConfiguration(), ROUTING_KEY,WordWithCount.class);
table.writeToSink(sink);
sink = new PulsarAvroTableSink(serviceUrl, outputTopic, new ProducerConfiguration(), ROUTING_KEY, WordWithCount.class);
} else {
TableSink sink = new CsvTableSink("./examples/file", "|");
// print the results with a csv file
table.writeToSink(sink);
sink = new CsvTableSink("./examples/file", "|");
}
table.writeToSink(sink);

env.execute("Pulsar Stream WordCount");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,17 @@
*
* <p>Example usage:
* --service-url pulsar://localhost:6650 --input-topic test_topic --subscription test_sub
* or
* --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub --output-topic test_sub
*/
public class PulsarConsumerSourceWordCountToJsonTableSink {
private static final String SERVICE_URL = "pulsar://localhost:6650";
private static final String ROUTING_KEY = "word";

public static void main(String[] args) throws Exception {
// parse input arguments
final ParameterTool parameterTool = ParameterTool.fromArgs(args);

if (parameterTool.getNumberOfParameters() < 2) {
if (parameterTool.getNumberOfParameters() < 3) {
System.out.println("Missing parameters!");
System.out.println("Usage: pulsar --service-url <pulsar-service-url> --input-topic <topic> --subscription <sub> --output-topic <topic>");
return;
Expand Down Expand Up @@ -98,22 +99,22 @@ public static void main(String[] args) throws Exception {
}
})
.returns(WordWithCount.class)
.keyBy("word")
.keyBy(ROUTING_KEY)
.timeWindow(Time.seconds(5))
.reduce((ReduceFunction<WordWithCount>) (c1, c2) ->
new WordWithCount(c1.word, c1.count + c2.count));

tableEnvironment.registerDataStream("wc",wc);

Table table = tableEnvironment.sqlQuery("select * from wc");
Table table = tableEnvironment.sqlQuery("select word, `count` from wc");
table.printSchema();
TableSink sink = null;
if (null != outputTopic) {
PulsarJsonTableSink sink = new PulsarJsonTableSink(SERVICE_URL, outputTopic, new ProducerConfiguration(), ROUTING_KEY);
table.writeToSink(sink);
sink = new PulsarJsonTableSink(serviceUrl, outputTopic, new ProducerConfiguration(), ROUTING_KEY);
} else {
TableSink sink = new CsvTableSink("./examples/file", "|");
// print the results with a csv file
table.writeToSink(sink);
sink = new CsvTableSink("./examples/file", "|");
}
table.writeToSink(sink);

env.execute("Pulsar Stream WordCount");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ The steps to run the example:
4. Run the word count example to print results to stdout.

```shell
$ ./bin/flink run ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub
$ ./bin/flink run -c org.apache.flink.streaming.connectors.pulsar.example.PulsarConsumerSourceWordCount ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub
```

5. Produce messages to topic `test_src`.
Expand All @@ -73,19 +73,16 @@ The steps to run the example:
6. You can check the flink taskexecutor `.out` file. The `.out` file will print the counts at the end of each time window as long as words are floating in, e.g.:

```shell
PulsarConsumerSourceWordCount.WordWithCount(word=hello, count=200)
PulsarConsumerSourceWordCount.WordWithCount(word=again, count=200)
PulsarConsumerSourceWordCount.WordWithCount(word=test, count=200)
PulsarConsumerSourceWordCount.WordWithCount(word=world, count=200)
PulsarConsumerSourceWordCount.WordWithCount(word=hello, count=100)
PulsarConsumerSourceWordCount.WordWithCount(word=again, count=100)
PulsarConsumerSourceWordCount.WordWithCount(word=test, count=100)
PulsarConsumerSourceWordCount.WordWithCount(word=world, count=100)
```

Alternatively, when you run the flink word count example at step 4, you can choose dump the result to another pulsar topic.

```shell
$ ./bin/flink run ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub --output-topic test_dest
$ ./bin/flink run -c org.apache.flink.streaming.connectors.pulsar.example.PulsarConsumerSourceWordCount ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub --output-topic test_dest
```

Once the flink word count example is running, you can use `bin/pulsar-client` to tail the results produced into topic `test_dest`.
Expand All @@ -95,3 +92,118 @@ $ bin/pulsar-client consume -n 0 -s test test_dest
```

You will see similar results as what you see at step 6 when running the word count example to print results to stdout.


### PulsarConsumerSourceWordCountToAvroTableSink

This Flink streaming job is consuming from a Pulsar topic and counting the wordcount in a streaming fashion. The job can write the word count results
to csv file or another Pulsar topic for avro format.

The steps to run the example:

Step 1, 2 and 3 are same as above.

4. Run the word count example to print results to stdout.

```shell
$ ./bin/flink run -c org.apache.flink.streaming.connectors.pulsar.example.PulsarConsumerSourceWordCountToAvroTableSink ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub
```

5. Produce messages to topic `test_src`.

```shell
$ bin/pulsar-client produce -m "hello world again" -n 100 test_src
```

6. You can check the ${FLINK_HOME}/examples/file. The file contains the counts at the end of each time window as long as words are floating in, e.g.:

```file
hello|100
again|100
test|100
world|100
```

Alternatively, when you run the flink word count example at step 4, you can choose dump the result to another pulsar topic.

```shell
$ ./bin/flink run -c org.apache.flink.streaming.connectors.pulsar.example.PulsarConsumerSourceWordCountToAvroTableSink ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub --output-topic test_dest
```

Once the flink word count example is running, you can use `bin/pulsar-client` to tail the results produced into topic `test_dest`.

```shell
$ bin/pulsar-client consume -n 0 -s test test_dest
```

You will see sample output for above linked application as follows:.
```
----- got message -----

hello�
----- got message -----

again�
----- got message -----
test�
----- got message -----

world�

```
### PulsarConsumerSourceWordCountToJsonTableSink
This Flink streaming job is consuming from a Pulsar topic and counting the wordcount in a streaming fashion. The job can write the word count results
to csv file or another Pulsar topic for json format.
The steps to run the example:
Step 1, 2 and 3 are same as above.
4. Run the word count example to print results to stdout.
```shell
$ ./bin/flink run -c org.apache.flink.streaming.connectors.pulsar.example.PulsarConsumerSourceWordCountToJsonTableSink ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub
```
If java.lang.ClassNotFoundException: org.apache.flink.table.sinks.TableSink and java.lang.NoClassDefFoundError: org/apache/flink/formats/json/JsonRowSerializationSchema, you need build Apache Flink from source, then copy flink-table_{version}.jar, flink-json_{version}.jar to ${FLINK_HOME}/lib and restart flink cluster.
5. Produce messages to topic `test_src`.
```shell
$ bin/pulsar-client produce -m "hello world again" -n 100 test_src
```
6. You can check the ${FLINK_HOME}/examples/file. The file contains the counts at the end of each time window as long as words are floating in, e.g.:
```file
hello|100
again|100
test|100
world|100
```
Alternatively, when you run the flink word count example at step 4, you can choose dump the result to another pulsar topic.
```shell
$ ./bin/flink run -c org.apache.flink.streaming.connectors.pulsar.example.PulsarConsumerSourceWordCountToJsonTableSink ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub --output-topic test_dest
```

Once the flink word count example is running, you can use `bin/pulsar-client` to tail the results produced into topic `test_dest`.

```shell
$ bin/pulsar-client consume -n 0 -s test test_dest
```

You will see sample output for above linked application as follows:.
```
----- got message -----
{"word":"hello","count":100}
----- got message -----
{"word":"again","count":100}
----- got message -----
{"word":"test","count":100}
----- got message -----
{"word":"world","count":100}
```
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ public void emitDataStream(DataStream<Row> dataStream) {

@Override
public TypeInformation<Row> getOutputType() {
return new RowTypeInfo(fieldTypes, fieldNames);
RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes, fieldNames);
return rowTypeInfo;
}

@Override
Expand Down Expand Up @@ -162,7 +163,7 @@ public AvroKeyExtractor(

@Override
public String getKey(Row event) {
return (String) event.getField(keyIndex);
return event.getField(keyIndex).toString();
}
}

Expand Down

0 comments on commit ab39aeb

Please sign in to comment.