Skip to content

Commit

Permalink
KAFKA-8972 (2.4 blocker): correctly release lost partitions during co…
Browse files Browse the repository at this point in the history
…nsumer.unsubscribe() (apache#7441)

Inside onLeavePrepare we would look into the assignment and try to revoke the owned tasks and notify users via RebalanceListener#onPartitionsRevoked, and then clear the assignment.

However, the subscription's assignment is already cleared in this.subscriptions.unsubscribe(); which means user's rebalance listener would never be triggered. In other words, from consumer client's pov nothing is owned after unsubscribe, but from the user caller's pov the partitions are not revoked yet. For callers like Kafka Streams which rely on the rebalance listener to maintain their internal state, this leads to inconsistent state management and failure cases.

Before KIP-429 this issue is hidden away since every time the consumer re-joins the group later, it would still revoke everything anyways regardless of the passed-in parameters of the rebalance listener; with KIP-429 this is easier to reproduce now.

Our fixes are following:

• Inside unsubscribe, first do onLeavePrepare / maybeLeaveGroup and then subscription.unsubscribe. This we we are guaranteed that the streams' tasks are all closed as revoked by then.
• [Optimization] If the generation is reset due to fatal error from join / hb response etc, then we know that all partitions are lost, and we should not trigger onPartitionRevoked, but instead just onPartitionsLost inside onLeavePrepare. This is because we don't want to commit for lost tracks during rebalance which is doomed to fail as we don't have any generation info.

Reviewers: Matthias J. Sax <[email protected]>, A. Sophie Blee-Goldman <[email protected]>, Bill Bejeck <[email protected]>, Guozhang Wang <[email protected]>
  • Loading branch information
Boyang Chen authored and guozhangwang committed Oct 29, 2019
1 parent 56bc507 commit 465f810
Show file tree
Hide file tree
Showing 8 changed files with 157 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1069,11 +1069,11 @@ public void unsubscribe() {
acquireAndEnsureOpen();
try {
fetcher.clearBufferedDataForUnassignedPartitions(Collections.emptySet());
this.subscriptions.unsubscribe();
if (this.coordinator != null) {
this.coordinator.onLeavePrepare();
this.coordinator.maybeLeaveGroup("the consumer unsubscribed from all topics");
}
this.subscriptions.unsubscribe();
log.info("Unsubscribed all topics or patterns and assigned partitions");
} finally {
release();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,12 @@ public void onLeavePrepare() {
Set<TopicPartition> droppedPartitions = new HashSet<>(subscriptions.assignedPartitions());

if (subscriptions.partitionsAutoAssigned() && !droppedPartitions.isEmpty()) {
final Exception e = invokePartitionsRevoked(droppedPartitions);
final Exception e;
if (generation() != Generation.NO_GENERATION) {
e = invokePartitionsRevoked(droppedPartitions);
} else {
e = invokePartitionsLost(droppedPartitions);
}

subscriptions.assignFromSubscribed(Collections.emptySet());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,12 @@ public class KafkaConsumerTest {
private final String groupId = "mock-group";
private final Optional<String> groupInstanceId = Optional.of("mock-instance");

private final String partitionRevoked = "Hit partition revoke ";
private final String partitionAssigned = "Hit partition assign ";
private final String partitionLost = "Hit partition lost ";

private final Collection<TopicPartition> singleTopicPartition = Collections.singleton(new TopicPartition(topic, 0));

@Test
public void testMetricsReporterAutoGeneratedClientId() {
Properties props = new Properties();
Expand Down Expand Up @@ -409,7 +415,7 @@ public void verifyHeartbeatSent() throws Exception {

assertEquals(singleton(tp0), consumer.assignment());

AtomicBoolean heartbeatReceived = prepareHeartbeatResponse(client, coordinator);
AtomicBoolean heartbeatReceived = prepareHeartbeatResponse(client, coordinator, Errors.NONE);

// heartbeat interval is 2 seconds
time.sleep(heartbeatIntervalMs);
Expand Down Expand Up @@ -444,7 +450,7 @@ public void verifyHeartbeatSentWhenFetchedDataReady() throws Exception {
client.poll(0, time.milliseconds());

client.prepareResponseFrom(fetchResponse(tp0, 5, 0), node);
AtomicBoolean heartbeatReceived = prepareHeartbeatResponse(client, coordinator);
AtomicBoolean heartbeatReceived = prepareHeartbeatResponse(client, coordinator, Errors.NONE);

time.sleep(heartbeatIntervalMs);
Thread.sleep(heartbeatIntervalMs);
Expand Down Expand Up @@ -661,7 +667,6 @@ public void testOffsetIsValidAfterSeek() {
MockClient client = new MockClient(time, metadata);

initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);

ConsumerPartitionAssignor assignor = new RoundRobinAssignor();

Expand Down Expand Up @@ -1066,12 +1071,7 @@ public void testSubscriptionChangesWithAutoCommitDisabled() {

KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);

// initial subscription
consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));

// verify that subscription has changed but assignment is still unchanged
assertEquals(singleton(topic), consumer.subscription());
assertEquals(Collections.emptySet(), consumer.assignment());
initializeSubscriptionWithSingleTopic(consumer, getConsumerRebalanceListener(consumer));

// mock rebalance responses
prepareRebalance(client, node, assignor, singletonList(tp0), null);
Expand Down Expand Up @@ -1111,6 +1111,71 @@ public void testSubscriptionChangesWithAutoCommitDisabled() {
consumer.close();
}

@Test
public void testUnsubscribeShouldTriggerPartitionsRevokedWithValidGeneration() {
Time time = new MockTime();
SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);

initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);

CooperativeStickyAssignor assignor = new CooperativeStickyAssignor();
KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);

initializeSubscriptionWithSingleTopic(consumer, getExceptionConsumerRebalanceListener());

prepareRebalance(client, node, assignor, singletonList(tp0), null);

RuntimeException assignmentException = assertThrows(RuntimeException.class,
() -> consumer.updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE)));
assertEquals(partitionAssigned + singleTopicPartition, assignmentException.getCause().getMessage());

RuntimeException unsubscribeException = assertThrows(RuntimeException.class, consumer::unsubscribe);
assertEquals(partitionRevoked + singleTopicPartition, unsubscribeException.getCause().getMessage());
}

@Test
public void testUnsubscribeShouldTriggerPartitionsLostWithNoGeneration() throws Exception {
Time time = new MockTime();
SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);

initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);

CooperativeStickyAssignor assignor = new CooperativeStickyAssignor();
KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);

initializeSubscriptionWithSingleTopic(consumer, getExceptionConsumerRebalanceListener());
Node coordinator = prepareRebalance(client, node, assignor, singletonList(tp0), null);

RuntimeException assignException = assertThrows(RuntimeException.class,
() -> consumer.updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE)));
assertEquals(partitionAssigned + singleTopicPartition, assignException.getCause().getMessage());

AtomicBoolean heartbeatReceived = prepareHeartbeatResponse(client, coordinator, Errors.UNKNOWN_MEMBER_ID);

time.sleep(heartbeatIntervalMs);
TestUtils.waitForCondition(heartbeatReceived::get, "Heartbeat response did not occur within timeout.");

consumer.updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE));
assertTrue(heartbeatReceived.get());

RuntimeException unsubscribeException = assertThrows(RuntimeException.class, consumer::unsubscribe);
assertEquals(partitionLost + singleTopicPartition, unsubscribeException.getCause().getMessage());
}

private void initializeSubscriptionWithSingleTopic(KafkaConsumer<String, String> consumer,
ConsumerRebalanceListener consumerRebalanceListener) {
consumer.subscribe(singleton(topic), consumerRebalanceListener);
// verify that subscription has changed but assignment is still unchanged
assertEquals(singleton(topic), consumer.subscription());
assertEquals(Collections.emptySet(), consumer.assignment());
}

@Test
public void testManualAssignmentChangeWithAutoCommitEnabled() {
Time time = new MockTime();
Expand Down Expand Up @@ -1666,7 +1731,7 @@ public void testRebalanceException() {
consumer.updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE));
fail("Should throw exception");
} catch (Throwable e) {
assertEquals("boom!", e.getCause().getMessage());
assertEquals(partitionAssigned + singleTopicPartition, e.getCause().getMessage());
}

// the assignment is still updated regardless of the exception
Expand All @@ -1677,7 +1742,7 @@ public void testRebalanceException() {
consumer.close(Duration.ofMillis(0));
fail("Should throw exception");
} catch (Throwable e) {
assertEquals("boom!", e.getCause().getCause().getMessage());
assertEquals(partitionRevoked + singleTopicPartition, e.getCause().getCause().getMessage());
}

consumer.close(Duration.ofMillis(0));
Expand Down Expand Up @@ -1720,12 +1785,17 @@ private ConsumerRebalanceListener getExceptionConsumerRebalanceListener() {
return new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
throw new RuntimeException("boom!");
throw new RuntimeException(partitionRevoked + partitions);
}

@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
throw new RuntimeException("boom!");
throw new RuntimeException(partitionAssigned + partitions);
}

@Override
public void onPartitionsLost(Collection<TopicPartition> partitions) {
throw new RuntimeException(partitionLost + partitions);
}
};
}
Expand Down Expand Up @@ -1779,15 +1849,15 @@ private Node prepareRebalance(MockClient client, Node node, ConsumerPartitionAss
return coordinator;
}

private AtomicBoolean prepareHeartbeatResponse(MockClient client, Node coordinator) {
private AtomicBoolean prepareHeartbeatResponse(MockClient client, Node coordinator, Errors error) {
final AtomicBoolean heartbeatReceived = new AtomicBoolean(false);
client.prepareResponseFrom(new MockClient.RequestMatcher() {
@Override
public boolean matches(AbstractRequest body) {
heartbeatReceived.set(true);
return true;
}
}, new HeartbeatResponse(new HeartbeatResponseData().setErrorCode(Errors.NONE.code())), coordinator);
}, new HeartbeatResponse(new HeartbeatResponseData().setErrorCode(error.code())), coordinator);
return heartbeatReceived;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,34 @@ public void testIllegalGeneration() {
assertEquals(Collections.singleton(t1p), rebalanceListener.lost);
}

@Test
public void testUnsubscribeWithValidGeneration() {
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));

subscriptions.subscribe(singleton(topic1), rebalanceListener);
ByteBuffer buffer = ConsumerProtocol.serializeAssignment(
new ConsumerPartitionAssignor.Assignment(Collections.singletonList(t1p), ByteBuffer.wrap(new byte[0])));
coordinator.onJoinComplete(1, "memberId", partitionAssignor.name(), buffer);

coordinator.onLeavePrepare();
assertEquals(1, rebalanceListener.lostCount);
assertEquals(0, rebalanceListener.revokedCount);
}

@Test
public void testUnsubscribeWithInvalidGeneration() {
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));

subscriptions.subscribe(singleton(topic1), rebalanceListener);
subscriptions.assignFromSubscribed(Collections.singletonList(t1p));

coordinator.onLeavePrepare();
assertEquals(1, rebalanceListener.lostCount);
assertEquals(0, rebalanceListener.revokedCount);
}

@Test
public void testUnknownMemberId() {
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
Expand Down Expand Up @@ -2300,7 +2328,7 @@ public void testConsumerRejoinAfterRebalance() throws Exception {

MockTime time = new MockTime(1);

//onJoinPrepare will be executed and onJoinComplete will not.
// onJoinPrepare will be executed and onJoinComplete will not.
boolean res = coordinator.joinGroupIfNeeded(time.timer(2));

assertFalse(res);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ public class StreamsRebalanceListener implements ConsumerRebalanceListener {
private final Logger log;

StreamsRebalanceListener(final Time time,
final TaskManager taskManager,
final StreamThread streamThread,
final Logger log) {
final TaskManager taskManager,
final StreamThread streamThread,
final Logger log) {
this.time = time;
this.taskManager = taskManager;
this.streamThread = streamThread;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,20 @@ public void setAssignmentMetadata(final Map<TaskId, Set<TopicPartition>> activeT
}
}

log.debug("Assigning metadata with: " +
"\tactiveTasks: {},\n" +
"\tstandbyTasks: {}\n" +
"The updated active task states are: \n" +
"\tassignedActiveTasks {},\n" +
"\tassignedStandbyTasks {},\n" +
"\taddedActiveTasks {},\n" +
"\taddedStandbyTasks {},\n" +
"\trevokedActiveTasks {},\n" +
"\trevokedStandbyTasks {}",
activeTasks, standbyTasks,
assignedActiveTasks, assignedStandbyTasks,
addedActiveTasks, addedStandbyTasks,
revokedActiveTasks, revokedStandbyTasks);
this.assignedActiveTasks = activeTasks;
this.assignedStandbyTasks = standbyTasks;
}
Expand Down
14 changes: 11 additions & 3 deletions tests/kafkatest/services/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,12 +303,20 @@ def start_node(self, node):
class StreamsSmokeTestBaseService(StreamsTestBaseService):
"""Base class for Streams Smoke Test services providing some common settings and functionality"""

def __init__(self, test_context, kafka, command):
def __init__(self, test_context, kafka, command, num_threads = 3):
super(StreamsSmokeTestBaseService, self).__init__(test_context,
kafka,
"org.apache.kafka.streams.tests.StreamsSmokeTest",
command)
self.NUM_THREADS = num_threads

def prop_file(self):
properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers(),
streams_property.NUM_THREADS: self.NUM_THREADS}

cfg = KafkaConfig(**properties)
return cfg.render()

class StreamsEosTestBaseService(StreamsTestBaseService):
"""Base class for Streams EOS Test services providing some common settings and functionality"""
Expand Down Expand Up @@ -352,8 +360,8 @@ def start_cmd(self, node):
return cmd

class StreamsSmokeTestJobRunnerService(StreamsSmokeTestBaseService):
def __init__(self, test_context, kafka):
super(StreamsSmokeTestJobRunnerService, self).__init__(test_context, kafka, "process")
def __init__(self, test_context, kafka, num_threads = 3):
super(StreamsSmokeTestJobRunnerService, self).__init__(test_context, kafka, "process", num_threads)

class StreamsSmokeTestEOSJobRunnerService(StreamsSmokeTestBaseService):
def __init__(self, test_context, kafka):
Expand Down
13 changes: 8 additions & 5 deletions tests/kafkatest/tests/streams/streams_broker_bounce_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def confirm_topics_on_all_brokers(self, expected_topic_set):
return True


def setup_system(self, start_processor=True):
def setup_system(self, start_processor=True, num_threads=3):
# Setup phase
self.zk = ZookeeperService(self.test_context, num_nodes=1)
self.zk.start()
Expand All @@ -164,7 +164,7 @@ def setup_system(self, start_processor=True):

# Start test harness
self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka)
self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka)
self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, num_threads)

self.driver.start()

Expand Down Expand Up @@ -207,13 +207,16 @@ def collect_results(self, sleep_time_secs):
@cluster(num_nodes=7)
@matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"],
broker_type=["leader", "controller"],
num_threads=[1, 3],
sleep_time_secs=[120])
def test_broker_type_bounce(self, failure_mode, broker_type, sleep_time_secs):
def test_broker_type_bounce(self, failure_mode, broker_type, sleep_time_secs, num_threads):
"""
Start a smoke test client, then kill one particular broker and ensure data is still received
Record if records are delivered.
Record if records are delivered.
We also add a single thread stream client to make sure we could get all partitions reassigned in
next generation so to verify the partition lost is correctly triggered.
"""
self.setup_system()
self.setup_system(num_threads=num_threads)

# Sleep to allow test to run for a bit
time.sleep(sleep_time_secs)
Expand Down

0 comments on commit 465f810

Please sign in to comment.