Skip to content

Commit

Permalink
KAFKA-6049: Add auto-repartitioning for cogroup (apache#7792)
Browse files Browse the repository at this point in the history
Follow up to PR apache#7538 (KIP-150)

Reviewers: Bill Bejeck <[email protected]>, Matthias J. Sax <[email protected]>
  • Loading branch information
wcarlson5 authored and mjsax committed Dec 13, 2019
1 parent ae38315 commit 8b57f6c
Show file tree
Hide file tree
Showing 6 changed files with 759 additions and 26 deletions.
59 changes: 45 additions & 14 deletions docs/streams/developer-guide/dsl-api.html
Original file line number Diff line number Diff line change
Expand Up @@ -640,11 +640,39 @@ <h4><a id="streams_concepts_globalktable" href="#streams_concepts_globalktable">
<span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">(),</span> <span class="cm">/* key (note: type was modified) */</span>
<span class="n">Serdes</span><span class="o">.</span><span class="na">Integer</span><span class="o">())</span> <span class="cm">/* value (note: type was modified) */</span>
<span class="o">);</span>
</pre></div>
</div>
</td>
</tr>
<tr class="row-even"><td><p class="first"><strong>Cogroup</strong></p>
<ul class="last simple">
<li>KGroupedStream &rarr; CogroupedKStream</li>
<li>CogroupedKStream &rarr; CogroupedKStream</li>
</ul>
</td>
<td><p class="first">Cogrouping allows to aggregate multiple input streams in a single operation.
The different (already grouped) input streams must have the same key type and may have different values types.
<a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KGroupedStream.html#cogroup">KGroupedStream#cogroup()</a> creates a new cogrouped stream with a single input stream, while <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/CogroupedKStream.html#cogroup">CogroupedKStream#cogroup()</a> adds a grouped stream to an existing cogrouped stream.
A <code>CogroupedKStream</code> may be <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/CogroupedKStream.html#windowedBy">windowed</a> before it is <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/CogroupedKStream.html#aggregate">aggregated</a>.

<p class="first">Cogroup does not cause a repartition as it has the prerequisite that the input streams are grouped. In the process of creating these groups they will have already been repartitioned if the stream was already marked for repartitioning.</p>
<div class="last highlight-java"><div class="highlight"><pre><span></span><span class="n">KStream</span><span class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">stream</span> <span class="o">=</span> <span class="o">...;</span>
<span class="n">KStream</span><span class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">stream2</span> <span class="o">=</span> <span class="o">...;</span>

<span class="c1">// Group by the existing key, using the application's configured</span>
<span class="c1">// default serdes for keys and values.</span>
<span class="n">KGroupedStream</span><span class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">groupedStream</span> <span class="o">=</span> <span class="n">stream</span><span class="o">.</span><span class="na">groupByKey</span><span class="o">();</span>
<span class="n">KGroupedStream</span><span class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">groupedStream2</span> <span class="o">=</span> <span class="n">stream2</span><span class="o">.</span><span class="na">groupByKey</span><span class="o">();</span>
<span class="n">CogroupedKStream</span><span class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">cogroupedStream</span> <span class="o">=</span> <span class="n">groupedStream</span><span class="o">.</span><span class="na">cogroup</span><span class="o">(aggregator1)</span>.</span><span class="na">cogroup</span><span class="o">(groupedStream2, aggregator2);</span>

<span class="n">KTable</span><span class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">table</span> <span class="o">=</span> <span class="n">cogroupedStream</span><span class="o">.</span><span class="na">aggregate</span><span class="o">(initializer);</span>

<span class="n">KTable</span><span class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">table2</span> <span class="o">=</span> <span class="n">cogroupedStream</span><span class="o">.</span><span class="na">windowedBy</span><span class="o">(TimeWindows.duration(500ms))</span>.</span><span class="na">aggregate</span><span class="o">(initializer);</span>
</pre></div>
</div>
</td>
</tr>
<tr class="row-even"><td><p class="first"><strong>Map</strong></p>
<tr class="row-odd"><td><p class="first"><strong>Map</strong></p>
<ul class="last simple">
<li>KStream &rarr; KStream</li>
</ul>
Expand Down Expand Up @@ -674,7 +702,7 @@ <h4><a id="streams_concepts_globalktable" href="#streams_concepts_globalktable">
</div>
</td>
</tr>
<tr class="row-odd"><td><p class="first"><strong>Map (values only)</strong></p>
<tr class="row-even"><td><p class="first"><strong>Map (values only)</strong></p>
<ul class="last simple">
<li>KStream &rarr; KStream</li>
<li>KTable &rarr; KTable</li>
Expand Down Expand Up @@ -703,7 +731,7 @@ <h4><a id="streams_concepts_globalktable" href="#streams_concepts_globalktable">
</div>
</td>
</tr>
<tr class="row-even"><td><p class="first"><strong>Merge</strong></p>
<tr class="row-odd"><td><p class="first"><strong>Merge</strong></p>
<ul class="last simple">
<li>KStream &rarr; KStream</li>
</ul>
Expand All @@ -726,7 +754,7 @@ <h4><a id="streams_concepts_globalktable" href="#streams_concepts_globalktable">
</div>
</td>
</tr>
<tr class="row-odd"><td><p class="first"><strong>Peek</strong></p>
<tr class="row-even"><td><p class="first"><strong>Peek</strong></p>
<ul class="last simple">
<li>KStream &rarr; KStream</li>
</ul>
Expand Down Expand Up @@ -757,7 +785,7 @@ <h4><a id="streams_concepts_globalktable" href="#streams_concepts_globalktable">
</div>
</td>
</tr>
<tr class="row-even"><td><p class="first"><strong>Print</strong></p>
<tr class="row-odd"><td><p class="first"><strong>Print</strong></p>
<ul class="last simple">
<li>KStream &rarr; void</li>
</ul>
Expand All @@ -777,7 +805,7 @@ <h4><a id="streams_concepts_globalktable" href="#streams_concepts_globalktable">
</div>
</td>
</tr>
<tr class="row-odd"><td><p class="first"><strong>SelectKey</strong></p>
<tr class="row-even"><td><p class="first"><strong>SelectKey</strong></p>
<ul class="last simple">
<li>KStream &rarr; KStream</li>
</ul>
Expand Down Expand Up @@ -805,7 +833,7 @@ <h4><a id="streams_concepts_globalktable" href="#streams_concepts_globalktable">
</div>
</td>
</tr>
<tr class="row-even"><td><p class="first"><strong>Table to Stream</strong></p>
<tr class="row-odd"><td><p class="first"><strong>Table to Stream</strong></p>
<ul class="last simple">
<li>KTable &rarr; KStream</li>
</ul>
Expand Down Expand Up @@ -912,14 +940,17 @@ <h4><a id="streams_concepts_globalktable" href="#streams_concepts_globalktable">
<li>KGroupedTable &rarr; KTable</li>
</ul>
</td>
<td><p class="first"><strong>Rolling aggregation.</strong> Aggregates the values of (non-windowed) records by the grouped key.
<td><p class="first"><strong>Rolling aggregation.</strong> Aggregates the values of (non-windowed) records by the grouped key or cogrouped.
Aggregating is a generalization of <code class="docutils literal"><span class="pre">reduce</span></code> and allows, for example, the aggregate value to have a different
type than the input values.
(<a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KGroupedStream.html">KGroupedStream details</a>,
<a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KGroupedTable.html">KGroupedTable details</a>)</p>
<a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KGroupedTable.html">KGroupedTable details</a>
<a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/CogroupedKStream.html">KGroupedTable details</a>)</p>
<p>When aggregating a <em>grouped stream</em>, you must provide an initializer (e.g., <code class="docutils literal"><span class="pre">aggValue</span> <span class="pre">=</span> <span class="pre">0</span></code>) and an &#8220;adder&#8221;
aggregator (e.g., <code class="docutils literal"><span class="pre">aggValue</span> <span class="pre">+</span> <span class="pre">curValue</span></code>). When aggregating a <em>grouped table</em>, you must provide a
&#8220;subtractor&#8221; aggregator (think: <code class="docutils literal"><span class="pre">aggValue</span> <span class="pre">-</span> <span class="pre">oldValue</span></code>).</p>
<p>When aggregating a <em>cogrouped stream</em>, the actual aggregators are provided for each input stream in the prior <code>cogroup()</code>calls, and thus you only need to provide an initializer (e.g., <code class="docutils literal"><span class="pre">aggValue</span> <span class="pre">=</span> <span class="pre">0</span></code>)

<p>Several variants of <code class="docutils literal"><span class="pre">aggregate</span></code> exist, see Javadocs for details.</p>
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="n">KGroupedStream</span><span class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">groupedStream</span> <span class="o">=</span> <span class="o">...;</span>
<span class="n">KGroupedTable</span><span class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">groupedTable</span> <span class="o">=</span> <span class="o">...;</span>
Expand Down Expand Up @@ -1009,7 +1040,7 @@ <h4><a id="streams_concepts_globalktable" href="#streams_concepts_globalktable">
<p class="last">See the example at the bottom of this section for a visualization of the aggregation semantics.</p>
</td>
</tr>
<tr class="row-odd"><td><p class="first"><strong>Aggregate (windowed)</strong></p>
<tr class="row-even"><td><p class="first"><strong>Aggregate (windowed)</strong></p>
<ul class="last simple">
<li>KGroupedStream &rarr; KTable</li>
</ul>
Expand Down Expand Up @@ -1106,7 +1137,7 @@ <h4><a id="streams_concepts_globalktable" href="#streams_concepts_globalktable">
<p class="last">See the example at the bottom of this section for a visualization of the aggregation semantics.</p>
</td>
</tr>
<tr class="row-even"><td><p class="first"><strong>Count</strong></p>
<tr class="row-odd"><td><p class="first"><strong>Count</strong></p>
<ul class="last simple">
<li>KGroupedStream &rarr; KTable</li>
<li>KGroupedTable &rarr; KTable</li>
Expand Down Expand Up @@ -1137,7 +1168,7 @@ <h4><a id="streams_concepts_globalktable" href="#streams_concepts_globalktable">
</ul>
</td>
</tr>
<tr class="row-odd"><td><p class="first"><strong>Count (windowed)</strong></p>
<tr class="row-even"><td><p class="first"><strong>Count (windowed)</strong></p>
<ul class="last simple">
<li>KGroupedStream &rarr; KTable</li>
</ul>
Expand Down Expand Up @@ -1169,7 +1200,7 @@ <h4><a id="streams_concepts_globalktable" href="#streams_concepts_globalktable">
</ul>
</td>
</tr>
<tr class="row-even"><td><p class="first"><strong>Reduce</strong></p>
<tr class="row-odd"><td><p class="first"><strong>Reduce</strong></p>
<ul class="last simple">
<li>KGroupedStream &rarr; KTable</li>
<li>KGroupedTable &rarr; KTable</li>
Expand Down Expand Up @@ -1252,7 +1283,7 @@ <h4><a id="streams_concepts_globalktable" href="#streams_concepts_globalktable">
<p class="last">See the example at the bottom of this section for a visualization of the aggregation semantics.</p>
</td>
</tr>
<tr class="row-odd"><td><p class="first"><strong>Reduce (windowed)</strong></p>
<tr class="row-even"><td><p class="first"><strong>Reduce (windowed)</strong></p>
<ul class="last simple">
<li>KGroupedStream &rarr; KTable</li>
</ul>
Expand Down
6 changes: 6 additions & 0 deletions docs/streams/upgrade-guide.html
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ <h1>Upgrade Guide and API Changes</h1>
</p>

<h3><a id="streams_api_changes_250" href="#streams_api_changes_250">Streams API changes in 2.5.0</a></h3>
<p>
We add a new <code>cogroup()</code> operator (via <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup">KIP-150</a>>)
that allows to aggregate multiple streams in a single operation.
Cogrouped streams can also be windowed before they are aggregated.
We refer to the <a href="/{{version}}/documentation/streams/developer-guide/dsl-api.html">developer guide</a> for more details.
</p>
<p>
As of 2.5.0 Kafka we deprecated <code>UsePreviousTimeOnInvalidTimestamp</code> and replaced it with <code>UsePartitionTimeOnInvalidTimeStamp</code> as per
<a href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=130028807">KIP-530</a>
Expand Down
1 change: 1 addition & 0 deletions docs/upgrade.html
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ <h5><a id="upgrade_250_notable" href="#upgrade_250_notable">Notable changes in 2
<li>For improved resiliency in typical network environments, the default value of
<code>zookeeper.session.timeout.ms</code> has been increased from 6s to 18s and
<code>replica.lag.time.max.ms</code> from 10s to 30s.</li>
<li>New DSL operator <code>cogroup()</code> has been added for aggregating multiple streams together at once</li>
</ul>

<h4><a id="upgrade_2_4_0" href="#upgrade_2_4_0">Upgrading from 0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, 1.0.x, 1.1.x, 2.0.x or 2.1.x or 2.2.x or 2.3.x to 2.4.0</a></h4>
Expand Down
Loading

0 comments on commit 8b57f6c

Please sign in to comment.