Skip to content

Commit

Permalink
Add partition movement details to OngoingExecutionException excepti…
Browse files Browse the repository at this point in the history
…on (linkedin#2126)

## Details

- Provide partition movement details in`OngoingExecutionException` exception message.
- Having partition movement details is handy for troubleshooting in cases when partition movements are ongoing for a long time, providing necessary details for potential mitigation.
  • Loading branch information
mhratson authored Feb 20, 2024
1 parent b858674 commit e4cac5c
Showing 1 changed file with 5 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.kafka.clients.admin.AlterPartitionReassignmentsResult;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.clients.admin.ElectLeadersResult;
import org.apache.kafka.clients.admin.PartitionReassignment;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
Expand Down Expand Up @@ -1048,15 +1049,17 @@ private void startExecution(LoadMonitor loadMonitor,
*/
private void sanityCheckOngoingMovement() throws OngoingExecutionException {
boolean hasOngoingPartitionReassignments;
Map<TopicPartition, PartitionReassignment> ongoingPartitionReassignments;
try {
hasOngoingPartitionReassignments = hasOngoingPartitionReassignments();
ongoingPartitionReassignments = ExecutionUtils.ongoingPartitionReassignments(_adminClient);
hasOngoingPartitionReassignments = !ongoingPartitionReassignments.keySet().isEmpty();
} catch (TimeoutException | InterruptedException | ExecutionException e) {
// This may indicate transient (e.g. network) issues.
throw new IllegalStateException("Failed to retrieve if there are already ongoing partition reassignments.", e);
}
// Note that in case there is an ongoing partition reassignment, we do not unpause metric sampling.
if (hasOngoingPartitionReassignments) {
throw new OngoingExecutionException("There are ongoing inter-broker partition movements.");
throw new OngoingExecutionException("There are ongoing inter-broker partition movements: " + ongoingPartitionReassignments);
} else {
boolean hasOngoingIntraBrokerReplicaMovement;
try {
Expand Down

0 comments on commit e4cac5c

Please sign in to comment.