Skip to content

Commit

Permalink
[Pulsar-Flink] Add Scala Examples (apache#3071)
Browse files Browse the repository at this point in the history
* [Pulsar-Flink] Add Scala Examples

* Line break is added for input text.

* Adding ASF Header.

* Fix License format
  • Loading branch information
erenavsarogullari authored and merlimat committed Nov 28, 2018
1 parent 8d24102 commit b413c19
Show file tree
Hide file tree
Showing 5 changed files with 260 additions and 0 deletions.
6 changes: 6 additions & 0 deletions pulsar-flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-client</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public static void main(String[] args) throws Exception {
// create DataSet
DataSet<String> textDS = env.fromElements(EINSTEIN_QUOTE);

// convert sentences to words
textDS.flatMap(new FlatMapFunction<String, WordWithCount>() {
@Override
public void flatMap(String value, Collector<WordWithCount> out) throws Exception {
Expand All @@ -59,23 +60,32 @@ public void flatMap(String value, Collector<WordWithCount> out) throws Exception
}
}
})

// filter words which length is bigger than 4
.filter(wordWithCount -> wordWithCount.word.length() > 4)

// group the words
.groupBy(new KeySelector<WordWithCount, String>() {
@Override
public String getKey(WordWithCount wordWithCount) throws Exception {
return wordWithCount.word;
}
})

// sum the word counts
.reduce(new ReduceFunction<WordWithCount>() {
@Override
public WordWithCount reduce(WordWithCount wordWithCount1, WordWithCount wordWithCount2) throws Exception {
return new WordWithCount(wordWithCount1.word, wordWithCount1.count + wordWithCount2.count);
}
})

// write batch data to Pulsar
.output(pulsarOutputFormat);

// set parallelism to write Pulsar in parallel (optional)
env.setParallelism(2);

// execute program
env.execute("Flink - Pulsar Batch WordCount");

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.flink.batch.connectors.pulsar.example

import org.apache.flink.api.java.tuple.Tuple4
import org.apache.flink.api.scala._
import org.apache.flink.batch.connectors.pulsar.PulsarCsvOutputFormat

/**
* Implements a batch Scala program on Pulsar topic by writing Flink DataSet as Csv.
*/
object FlinkPulsarBatchCsvSinkScalaExample {

/**
* NasaMission Model
*/
private case class NasaMission(id: Int, missionName: String, startYear: Int, endYear: Int)
extends Tuple4(id, missionName, startYear, endYear)

private val SERVICE_URL = "pulsar://127.0.0.1:6650"
private val TOPIC_NAME = "my-flink-topic"

private val nasaMissions = List(
NasaMission(1, "Mercury program", 1959, 1963),
NasaMission(2, "Apollo program", 1961, 1972),
NasaMission(3, "Gemini program", 1963, 1966),
NasaMission(4, "Skylab", 1973, 1974),
NasaMission(5, "Apollo–Soyuz Test Project", 1975, 1975))

def main(args: Array[String]): Unit = {

// set up the execution environment
val env = ExecutionEnvironment.getExecutionEnvironment

// create PulsarCsvOutputFormat instance
val pulsarCsvOutputFormat =
new PulsarCsvOutputFormat[NasaMission](SERVICE_URL, TOPIC_NAME)

// create DataSet
val textDS = env.fromCollection(nasaMissions)

// map nasa mission names to upper-case
textDS.map(nasaMission => NasaMission(
nasaMission.id,
nasaMission.missionName.toUpperCase,
nasaMission.startYear,
nasaMission.endYear))

// filter missions which started after 1970
.filter(_.startYear > 1970)

// write batch data to Pulsar as Csv
.output(pulsarCsvOutputFormat)

// set parallelism to write Pulsar in parallel (optional)
env.setParallelism(2)

// execute program
env.execute("Flink - Pulsar Batch Csv")
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.flink.batch.connectors.pulsar.example

import org.apache.flink.api.scala._
import org.apache.flink.batch.connectors.pulsar.PulsarJsonOutputFormat

import scala.beans.BeanProperty

/**
* Implements a batch Scala program on Pulsar topic by writing Flink DataSet as Json.
*/
object FlinkPulsarBatchJsonSinkScalaExample {

/**
* NasaMission Model
*/
private case class NasaMission(@BeanProperty id: Int,
@BeanProperty missionName: String,
@BeanProperty startYear: Int,
@BeanProperty endYear: Int)

private val nasaMissions = List(
NasaMission(1, "Mercury program", 1959, 1963),
NasaMission(2, "Apollo program", 1961, 1972),
NasaMission(3, "Gemini program", 1963, 1966),
NasaMission(4, "Skylab", 1973, 1974),
NasaMission(5, "Apollo–Soyuz Test Project", 1975, 1975))

private val SERVICE_URL = "pulsar://127.0.0.1:6650"
private val TOPIC_NAME = "my-flink-topic"

def main(args: Array[String]): Unit = {

// set up the execution environment
val env = ExecutionEnvironment.getExecutionEnvironment

// create PulsarJsonOutputFormat instance
val pulsarJsonOutputFormat = new PulsarJsonOutputFormat[NasaMission](SERVICE_URL, TOPIC_NAME)

// create DataSet
val nasaMissionDS = env.fromCollection(nasaMissions)

// map nasa mission names to upper-case
nasaMissionDS.map(nasaMission =>
NasaMission(
nasaMission.id,
nasaMission.missionName.toUpperCase,
nasaMission.startYear,
nasaMission.endYear))

// filter missions which started after 1970
.filter(_.startYear > 1970)

// write batch data to Pulsar
.output(pulsarJsonOutputFormat)

// set parallelism to write Pulsar in parallel (optional)
env.setParallelism(2)

// execute program
env.execute("Flink - Pulsar Batch Json")
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.flink.batch.connectors.pulsar.example

import org.apache.flink.api.common.serialization.SerializationSchema
import org.apache.flink.api.scala._
import org.apache.flink.batch.connectors.pulsar.PulsarOutputFormat
import org.apache.flink.util.Collector

/**
* Data type for words with count.
*/
case class WordWithCount(word: String, count: Long) {
override def toString: String = "WordWithCount { word = " + word + ", count = " + count + " }"
}

/**
* Implements a batch word-count Scala program on Pulsar topic by writing Flink DataSet.
*/
object FlinkPulsarBatchSinkScalaExample {

private val EINSTEIN_QUOTE = "Imagination is more important than knowledge. " +
"Knowledge is limited. Imagination encircles the world."
private val SERVICE_URL = "pulsar://127.0.0.1:6650"
private val TOPIC_NAME = "my-flink-topic"

def main(args: Array[String]): Unit = {

// set up the execution environment
val env = ExecutionEnvironment.getExecutionEnvironment

// create PulsarOutputFormat instance
val pulsarOutputFormat =
new PulsarOutputFormat[WordWithCount](SERVICE_URL, TOPIC_NAME, new SerializationSchema[WordWithCount] {
override def serialize(wordWithCount: WordWithCount): Array[Byte] = wordWithCount.toString.getBytes
})

// create DataSet
val textDS = env.fromElements[String](EINSTEIN_QUOTE)

// convert sentence to words
textDS.flatMap((value: String, out: Collector[WordWithCount]) => {
val words = value.toLowerCase.split(" ")
for (word <- words) {
out.collect(new WordWithCount(word.replace(".", ""), 1))
}
})

// filter words which length is bigger than 4
.filter((wordWithCount: WordWithCount) => wordWithCount.word.length > 4)

// group the words
.groupBy((wordWithCount: WordWithCount) => wordWithCount.word)

// sum the word counts
.reduce((wordWithCount1: WordWithCount, wordWithCount2: WordWithCount) =>
new WordWithCount(wordWithCount1.word, wordWithCount1.count + wordWithCount2.count))

// write batch data to Pulsar
.output(pulsarOutputFormat)

// set parallelism to write Pulsar in parallel (optional)
env.setParallelism(2)

// execute program
env.execute("Flink - Pulsar Batch WordCount")
}

}

0 comments on commit b413c19

Please sign in to comment.