Skip to content

Commit

Permalink
Explain the separate upgrade paths for consumer groups and Streams (a…
Browse files Browse the repository at this point in the history
…pache#7516)

Document the upgrade path for the consumer and for Streams (note that they differ significantly).

Needs to be cherry-picked to 2.4

Reviewers: Guozhang Wang <[email protected]>
  • Loading branch information
ableegoldman authored and guozhangwang committed Oct 16, 2019
1 parent 1d6d370 commit 3e30bf5
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 6 deletions.
17 changes: 11 additions & 6 deletions docs/streams/upgrade-guide.html
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ <h1>Upgrade Guide and API Changes</h1>
</div>

<p>
Upgrading from any older version to {{fullDotVersion}} is possible: (1) if you are upgrading from 2.0.x to {{fullDotVersion}} then a single rolling bounce is needed to swap in the new jar,
(2) if you are upgrading from older versions than 2.0.x in the online mode, you would need two rolling bounces where
the first rolling bounce phase you need to set config <code>upgrade.from="older version"</code> (possible values are <code>"0.10.0", "0.10.1", "0.10.2", "0.11.0", "1.0", and "1.1"</code>)
(cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade">KIP-268</a>):
Upgrading from any older version to {{fullDotVersion}} is possible: you will need to do two rolling bounces, where during the first rolling bounce phase you set the config <code>upgrade.from="older version"</code>
(possible values are <code>"0.10.0" - "2.3"</code>) and during the second you remove it. This is required to safely upgrade to the new cooperative rebalancing protocol of the embedded consumer. Note that you will remain using the old eager
rebalancing protocol if you skip or delay the second rolling bounce, but you can safely switch over to cooperative at any time once the entire group is on 2.4+ by removing the config value and bouncing. For more details please refer to
<a href="https://cwiki.apache.org/confluence/x/vAclBg">KIP-429</a>:
</p>
<ul>
<li> prepare your application instances for a rolling bounce and make sure that config <code>upgrade.from</code> is set to the version from which it is being upgrade.</li>
Expand Down Expand Up @@ -75,11 +75,16 @@ <h1>Upgrade Guide and API Changes</h1>
<h3><a id="streams_api_changes_240" href="#streams_api_changes_240">Streams API changes in 2.4.0</a></h3>
<!-- Placeholder KIP-213 -->
<!-- Placeholder KIP-307 -->
<!-- Placeholder KIP-479 -->

<!-- Placeholder KIP-429 -->
<p>
With the introduction of incremental cooperative rebalancing, Streams no longer requires all tasks be revoked at the beginning of a rebalance. Instead, at the completion of the rebalance only those tasks which are to be migrated to another consumer
for overall load balance will need to be closed and revoked. This changes the semantics of the <code>StateListener</code> a bit, as it will not necessarily transition to <code>REBALANCING</code> at the beginning of a rebalance anymore. Note that
this means IQ will now be available at all times except during state restoration, including while a rebalance is in progress. If restoration is occurring when a rebalance begins, we will continue to actively restore the state stores and/or process
standby tasks during a cooperative rebalance. Note that with this new rebalancing protocol, you may sometimes see a rebalance be followed by a second short rebalance that ensures all tasks are safely distributed. For details on please see
<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol">KIP-429</a>.
</p>

<p>
The 2.4.0 release contains newly added and reworked metrics.
Expand Down
25 changes: 25 additions & 0 deletions docs/upgrade.html
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,31 @@ <h5><a id="upgrade_240_notable" href="#upgrade_240_notable">Notable changes in 2
The old overloaded functions are deprecated and we would recommend users to make their code changes to leverage the new methods (details
can be found in <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-520%3A+Add+overloaded+Consumer%23committed+for+batching+partitions">KIP-520</a>).
</li>
<li>We are introducing incremental cooperative rebalancing to the clients' group protocol, which allows consumers to keep all of their assigned partitions during a rebalance
and at the end revoke only those which must be migrated to another consumer for overall cluster balance. The <code>ConsumerCoordinator</code> will choose the latest <code>RebalanceProtocol</code>
that is commonly supported by all of the consumer's supported assignors. You can use the new built-in <code>CooperativeStickyAssignor</code> or plug in your own custom cooperative assignor. To do
so you must implement the <code>ConsumerPartitionAssignor</code> interface and include <code>RebalanceProtocol.COOPERATIVE</code> in the list returned by <code>ConsumerPartitionAssignor#supportedProtocols</code>.
Your custom assignor can then leverage the <code>ownedPartitions</code> field in each consumer's <code>Subscription</code> to give partitions back to their previous owners whenever possible. Note that when
a partition is to be reassigned to another consumer, it <em>must</em> be removed from the new assignment until it has been revoked from its original owner. Any consumer that has to revoke a partition will trigger
a followup rebalance to allow the revoked partition to safely be assigned to its new owner. See the
<a href="https://kafka.apache.org/24/javadoc/index.html?org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.RebalanceProtocol.html">ConsumerPartitionAssignor RebalanceProtocol javadocs</a> for more information.
<br>
To upgrade from the old (eager) protocol, which always revokes all partitions before rebalancing, to cooperative rebalancing, you must follow a specific upgrade path to get all clients on the same <code>ConsumerPartitionAssignor</code>
that supports the cooperative protocol. This can be done with two rolling bounces, using the <code>CooperativeStickyAssignor</code> for the example: during the first one, add "cooperative-sticky" to the list of supported assignors
for each member (without removing the previous assignor -- note that if previously using the default, you must include that explicitly as well). You then bounce and/or upgrade it.
Once the entire group is on 2.4+ and all members have the "cooperative-sticky" among their supported assignors, remove the other assignor(s) and perform a second rolling bounce so that by the end all members support only the
cooperative protocol. For further details on the cooperative rebalancing protocol and upgrade path, see <a href="https://cwiki.apache.org/confluence/x/vAclBg">KIP-429</a>.
</li>
<li>There are some behavioral changes to the <code>ConsumerRebalanceListener</code>, as well as a new API. Exceptions thrown during any of the listener's three callbacks will no longer be swallowed, and will instead be re-thrown
all the way up to the <code>Consumer.poll()</code> call. The <code>onPartitionsLost</code> method has been added to allow users to react to abnormal circumstances where a consumer may have lost ownership of its partitions
(such as a missed rebalance) and cannot commit offsets. By default, this will simply call the existing <code>onPartitionsRevoked</code> API to align with previous behavior. Note however that <code>onPartitionsLost</code> will not
be called when the set of lost partitions is empty. This means that no callback will be invoked at the beginning of the first rebalance of a new consumer joining the group.
<br>
The semantics of the <code>ConsumerRebalanceListener's</code> callbacks are further changed when following the cooperative rebalancing protocol described above. In addition to <code>onPartitionsLost</code>, <code>onPartitionsRevoked</code>
will also never be called when the set of revoked partitions is empty. The callback will generally be invoked only at the end of a rebalance, and only on the set of partitions that are being moved to another consumer. The
<code>onPartitionsAssigned</code> callback will however always be called, even with an empty set of partitions, as a way to notify users of a rebalance event (this is true for both cooperative and eager). For details on
the new callback semantics, see the <a href="https://kafka.apache.org/24/javadoc/index.html?org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html">ConsumerRebalanceListener javadocs</a>.
</li>
</ul>

<h4><a id="upgrade_2_3_0" href="#upgrade_2_3_0">Upgrading from 0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, 1.0.x, 1.1.x, 2.0.x or 2.1.x or 2.2.x to 2.3.0</a></h4>
Expand Down

0 comments on commit 3e30bf5

Please sign in to comment.