Skip to content

Commit

Permalink
KAFKA-5541: Streams should not re-throw if suspending/closing tasks f…
Browse files Browse the repository at this point in the history
…ails

Author: Matthias J. Sax <[email protected]>

Reviewers: Damian Guy <[email protected]>, Bill Bejeck <[email protected]>, Guozhang Wang <[email protected]>

Closes apache#4037 from mjsax/kafka-5541-dont-rethrow-on-suspend-or-close-2
  • Loading branch information
mjsax authored and guozhangwang committed Oct 7, 2017
1 parent e2e8d4a commit a1ea536
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -210,38 +210,38 @@ private RuntimeException closeNonRunningTasks(final Collection<Task> tasks) {
}

private RuntimeException suspendTasks(final Collection<Task> tasks) {
RuntimeException exception = null;
final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
for (Iterator<Task> it = tasks.iterator(); it.hasNext(); ) {
final Task task = it.next();
try {
task.suspend();
suspended.put(task.id(), task);
} catch (final TaskMigratedException closeAsZombieAndSwallow) {
// as we suspend a task, we are either shutting down or rebalancing, thus, we swallow and move on
closeZombieTask(task);
firstException.compareAndSet(null, closeZombieTask(task));
it.remove();
} catch (final RuntimeException e) {
log.error("Suspending {} {} failed due to the following error:", taskTypeName, task.id(), e);
firstException.compareAndSet(null, e);
try {
task.close(false, false);
} catch (final Exception f) {
} catch (final RuntimeException f) {
log.error("After suspending failed, closing the same {} {} failed again due to the following error:", taskTypeName, task.id(), f);
}
if (exception == null) {
exception = e;
}
}
}
return exception;
return firstException.get();
}

private void closeZombieTask(final Task task) {
private RuntimeException closeZombieTask(final Task task) {
log.warn("{} {} got migrated to another thread already. Closing it as zombie.", taskTypeName, task.id());
try {
task.close(false, true);
} catch (final Exception e) {
} catch (final RuntimeException e) {
log.warn("Failed to close zombie {} {} due to {}; ignore and proceed.", taskTypeName, task.id(), e.getMessage());
return e;
}
return null;
}

boolean hasRunningTasks() {
Expand All @@ -260,7 +260,10 @@ boolean maybeResumeSuspendedTask(final TaskId taskId, final Set<TopicPartition>
try {
task.resume();
} catch (final TaskMigratedException e) {
closeZombieTask(task);
final RuntimeException fatalException = closeZombieTask(task);
if (fatalException != null) {
throw fatalException;
}
suspended.remove(taskId);
throw e;
}
Expand Down Expand Up @@ -402,7 +405,10 @@ int process() {
processed++;
}
} catch (final TaskMigratedException e) {
closeZombieTask(task);
final RuntimeException fatalException = closeZombieTask(task);
if (fatalException != null) {
throw fatalException;
}
it.remove();
throw e;
} catch (final RuntimeException e) {
Expand All @@ -429,7 +435,10 @@ int punctuate() {
punctuated++;
}
} catch (final TaskMigratedException e) {
closeZombieTask(task);
final RuntimeException fatalException = closeZombieTask(task);
if (fatalException != null) {
throw fatalException;
}
it.remove();
throw e;
} catch (final KafkaException e) {
Expand All @@ -448,7 +457,10 @@ private void applyToRunningTasks(final TaskAction action) {
try {
action.apply(task);
} catch (final TaskMigratedException e) {
closeZombieTask(task);
final RuntimeException fatalException = closeZombieTask(task);
if (fatalException != null) {
throw fatalException;
}
it.remove();
if (firstException == null) {
firstException = e;
Expand Down Expand Up @@ -488,20 +500,45 @@ void closeNonAssignedSuspendedTasks(final Map<TaskId, Set<TopicPartition>> newAs
}

void close(final boolean clean) {
close(allTasks(), clean);
clear();
}

private void close(final Collection<Task> tasks, final boolean clean) {
for (final Task task : tasks) {
final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
for (final Task task : allTasks()) {
try {
task.close(clean, false);
} catch (final Throwable t) {
} catch (final TaskMigratedException e) {
firstException.compareAndSet(null, closeZombieTask(task));
} catch (final RuntimeException t) {
firstException.compareAndSet(null, t);
log.error("Failed while closing {} {} due to the following error:",
task.getClass().getSimpleName(),
task.id(),
t);
firstException.compareAndSet(null, closeUncleanIfRequired(task, clean));
}
}

clear();

final RuntimeException fatalException = firstException.get();
if (fatalException != null) {
throw fatalException;
}
}

private RuntimeException closeUncleanIfRequired(final Task task,
final boolean triedToCloseCleanlyPreviously) {
if (triedToCloseCleanlyPreviously) {
log.info("Try to close {} {} unclean.", task.getClass().getSimpleName(), task.id());
try {
task.close(false, false);
} catch (final RuntimeException fatalException) {
log.error("Failed while closing {} {} due to the following error:",
task.getClass().getSimpleName(),
task.id(),
fatalException);
return fatalException;
}
}

return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1193,7 +1193,11 @@ private void completeShutdown(final boolean cleanRun) {

log.info("Shutting down");

taskManager.shutdown(cleanRun);
try {
taskManager.shutdown(cleanRun);
} catch (final Throwable e) {
log.error("Failed to close task manager due to the following error:", e);
}
try {
consumer.close();
} catch (final Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,16 @@ void suspendTasksAndState() {
}

void shutdown(final boolean clean) {
final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);

log.debug("Shutting down all active tasks {}, standby tasks {}, suspended tasks {}, and suspended standby tasks {}", active.runningTaskIds(), standby.runningTaskIds(),
active.previousTaskIds(), standby.previousTaskIds());

active.close(clean);
try {
active.close(clean);
} catch (final RuntimeException fatalException) {
firstException.compareAndSet(null, fatalException);
}
standby.close(clean);
try {
threadMetadataProvider.close();
Expand All @@ -220,6 +226,11 @@ void shutdown(final boolean clean) {
restoreConsumer.assign(Collections.<TopicPartition>emptyList());
taskCreator.close();
standbyTaskCreator.close();

final RuntimeException fatalException = firstException.get();
if (fatalException != null) {
throw fatalException;
}
}

Set<TaskId> suspendedActiveTaskIds() {
Expand Down

0 comments on commit a1ea536

Please sign in to comment.