Skip to content

Commit

Permalink
KAFKA-7027: Add an overload build method in scala (apache#6373)
Browse files Browse the repository at this point in the history
The Java API can pass a Properties object to StreamsBuilder#build, to allow, e.g., topology optimization, while the Scala API does not yet. The latter only delegates the work to the underlying Java implementation.

Reviewers: John Roesler <[email protected]>,  Bill Bejeck <[email protected]>
  • Loading branch information
massimosiani authored and bbejeck committed Mar 15, 2019
1 parent ee3b9c5 commit 853f24a
Show file tree
Hide file tree
Showing 2 changed files with 184 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
*/
package org.apache.kafka.streams.scala

import java.util.Properties
import java.util.regex.Pattern

import org.apache.kafka.streams.kstream.GlobalKTable
Expand Down Expand Up @@ -183,4 +184,14 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) {
inner.addGlobalStore(storeBuilder, topic, consumed, stateUpdateSupplier)

def build(): Topology = inner.build()

/**
* Returns the `Topology` that represents the specified processing logic and accepts
* a `Properties` instance used to indicate whether to optimize topology or not.
*
* @param props the `Properties` used for building possibly optimized topology
* @return the `Topology` that represents the specified processing logic
* @see `org.apache.kafka.streams.StreamsBuilder#build`
*/
def build(props: Properties): Topology = inner.build(props)
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,31 @@
*/
package org.apache.kafka.streams.scala

import java.time.Duration
import java.util
import java.util.{Locale, Properties}
import java.util.regex.Pattern

import org.apache.kafka.common.serialization.{Serdes => SerdesJ}
import org.apache.kafka.streams.kstream.{
Aggregator,
ForeachAction,
Initializer,
JoinWindows,
KeyValueMapper,
Predicate,
Reducer,
Transformer,
TransformerSupplier,
ValueJoiner,
ValueMapper,
Joined => JoinedJ,
KGroupedStream => KGroupedStreamJ,
KStream => KStreamJ,
KTable => KTableJ
KTable => KTableJ,
Materialized => MaterializedJ
}
import org.apache.kafka.streams.processor.ProcessorContext
import org.apache.kafka.streams.processor.{AbstractProcessor, ProcessorContext, ProcessorSupplier}
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.Serdes._
import org.apache.kafka.streams.scala.kstream._
Expand Down Expand Up @@ -268,4 +279,164 @@ class TopologyTest extends JUnitSuite {
// should match
assertEquals(getTopologyScala, getTopologyJava)
}

@Test def shouldBuildIdenticalTopologyInJavaNScalaProperties(): Unit = {

val props = new Properties()
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE)

val propsNoOptimization = new Properties()
propsNoOptimization.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.NO_OPTIMIZATION)

val AGGREGATION_TOPIC = "aggregationTopic"
val REDUCE_TOPIC = "reduceTopic"
val JOINED_TOPIC = "joinedTopic"

// build the Scala topology
def getTopologyScala: StreamsBuilder = {

val aggregator = (_: String, v: String, agg: Int) => agg + v.length
val reducer = (v1: String, v2: String) => v1 + ":" + v2
val processorValueCollector: util.List[String] = new util.ArrayList[String]

val builder: StreamsBuilder = new StreamsBuilder

val sourceStream: KStream[String, String] =
builder.stream(inputTopic)(Consumed.`with`(Serdes.String, Serdes.String))

val mappedStream: KStream[String, String] =
sourceStream.map((k: String, v: String) => (k.toUpperCase(Locale.getDefault), v))
mappedStream
.filter((k: String, _: String) => k == "B")
.mapValues((v: String) => v.toUpperCase(Locale.getDefault))
.process(() => new SimpleProcessor(processorValueCollector))

val stream2 = mappedStream.groupByKey
.aggregate(0)(aggregator)(Materialized.`with`(Serdes.String, Serdes.Integer))
.toStream
stream2.to(AGGREGATION_TOPIC)(Produced.`with`(Serdes.String, Serdes.Integer))

// adding operators for case where the repartition node is further downstream
val stream3 = mappedStream
.filter((_: String, _: String) => true)
.peek((k: String, v: String) => System.out.println(k + ":" + v))
.groupByKey
.reduce(reducer)(Materialized.`with`(Serdes.String, Serdes.String))
.toStream
stream3.to(REDUCE_TOPIC)(Produced.`with`(Serdes.String, Serdes.String))

mappedStream
.filter((k: String, _: String) => k == "A")
.join(stream2)((v1: String, v2: Int) => v1 + ":" + v2.toString, JoinWindows.of(Duration.ofMillis(5000)))(
Joined.`with`(Serdes.String, Serdes.String, Serdes.Integer)
)
.to(JOINED_TOPIC)

mappedStream
.filter((k: String, _: String) => k == "A")
.join(stream3)((v1: String, v2: String) => v1 + ":" + v2.toString, JoinWindows.of(Duration.ofMillis(5000)))(
Joined.`with`(Serdes.String, Serdes.String, Serdes.String)
)
.to(JOINED_TOPIC)

builder
}

// build the Java topology
def getTopologyJava: StreamsBuilderJ = {

val keyValueMapper: KeyValueMapper[String, String, KeyValue[String, String]] =
new KeyValueMapper[String, String, KeyValue[String, String]] {
override def apply(key: String, value: String): KeyValue[String, String] =
KeyValue.pair(key.toUpperCase(Locale.getDefault), value)
}
val initializer: Initializer[Integer] = new Initializer[Integer] {
override def apply(): Integer = 0
}
val aggregator: Aggregator[String, String, Integer] = new Aggregator[String, String, Integer] {
override def apply(key: String, value: String, aggregate: Integer): Integer = aggregate + value.length
}
val reducer: Reducer[String] = new Reducer[String] {
override def apply(v1: String, v2: String): String = v1 + ":" + v2
}
val valueMapper: ValueMapper[String, String] = new ValueMapper[String, String] {
override def apply(v: String): String = v.toUpperCase(Locale.getDefault)
}
val processorValueCollector = new util.ArrayList[String]
val processorSupplier: ProcessorSupplier[String, String] = new ProcessorSupplier[String, String] {
override def get() = new SimpleProcessor(processorValueCollector)
}
val valueJoiner2: ValueJoiner[String, Integer, String] = new ValueJoiner[String, Integer, String] {
override def apply(value1: String, value2: Integer): String = value1 + ":" + value2.toString
}
val valueJoiner3: ValueJoiner[String, String, String] = new ValueJoiner[String, String, String] {
override def apply(value1: String, value2: String): String = value1 + ":" + value2.toString
}

val builder = new StreamsBuilderJ

val sourceStream = builder.stream(inputTopic, Consumed.`with`(Serdes.String, Serdes.String))

val mappedStream: KStreamJ[String, String] =
sourceStream.map(keyValueMapper)
mappedStream
.filter(new Predicate[String, String] {
override def test(key: String, value: String): Boolean = key == "B"
})
.mapValues[String](valueMapper)
.process(processorSupplier)

val stream2 = mappedStream.groupByKey
.aggregate(initializer, aggregator, MaterializedJ.`with`(Serdes.String, SerdesJ.Integer))
.toStream
stream2.to(AGGREGATION_TOPIC, Produced.`with`(Serdes.String, SerdesJ.Integer))

// adding operators for case where the repartition node is further downstream
val stream3 = mappedStream
.filter(new Predicate[String, String] {
override def test(k: String, v: String) = true
})
.peek(new ForeachAction[String, String] {
override def apply(k: String, v: String) = System.out.println(k + ":" + v)
})
.groupByKey
.reduce(reducer, MaterializedJ.`with`(Serdes.String, Serdes.String))
.toStream
stream3.to(REDUCE_TOPIC, Produced.`with`(Serdes.String, Serdes.String))

mappedStream
.filter(new Predicate[String, String] {
override def test(key: String, value: String): Boolean = key == "A"
})
.join(stream2,
valueJoiner2,
JoinWindows.of(Duration.ofMillis(5000)),
JoinedJ.`with`(Serdes.String, Serdes.String, SerdesJ.Integer))
.to(JOINED_TOPIC)

mappedStream
.filter(new Predicate[String, String] {
override def test(key: String, value: String): Boolean = key == "A"
})
.join(stream3,
valueJoiner3,
JoinWindows.of(Duration.ofMillis(5000)),
JoinedJ.`with`(Serdes.String, Serdes.String, SerdesJ.String))
.to(JOINED_TOPIC)

builder
}

assertNotEquals(getTopologyScala.build(props).describe.toString,
getTopologyScala.build(propsNoOptimization).describe.toString)
assertEquals(getTopologyScala.build(propsNoOptimization).describe.toString,
getTopologyJava.build(propsNoOptimization).describe.toString)
assertEquals(getTopologyScala.build(props).describe.toString, getTopologyJava.build(props).describe.toString)
}

private class SimpleProcessor private[TopologyTest] (val valueList: util.List[String])
extends AbstractProcessor[String, String] {
override def process(key: String, value: String): Unit =
valueList.add(value)
}
}

0 comments on commit 853f24a

Please sign in to comment.