From a1ea5360631f4366ba536fca37d9f906b0e58a5c Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Fri, 6 Oct 2017 19:59:03 -0700 Subject: [PATCH] KAFKA-5541: Streams should not re-throw if suspending/closing tasks fails Author: Matthias J. Sax Reviewers: Damian Guy , Bill Bejeck , Guozhang Wang Closes #4037 from mjsax/kafka-5541-dont-rethrow-on-suspend-or-close-2 --- .../processor/internals/AssignedTasks.java | 77 ++++++++++++++----- .../processor/internals/StreamThread.java | 6 +- .../processor/internals/TaskManager.java | 13 +++- 3 files changed, 74 insertions(+), 22 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java index 6ab807fbacde4..680bbd3f0d35f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java @@ -210,7 +210,7 @@ private RuntimeException closeNonRunningTasks(final Collection tasks) { } private RuntimeException suspendTasks(final Collection tasks) { - RuntimeException exception = null; + final AtomicReference firstException = new AtomicReference<>(null); for (Iterator it = tasks.iterator(); it.hasNext(); ) { final Task task = it.next(); try { @@ -218,30 +218,30 @@ private RuntimeException suspendTasks(final Collection tasks) { suspended.put(task.id(), task); } catch (final TaskMigratedException closeAsZombieAndSwallow) { // as we suspend a task, we are either shutting down or rebalancing, thus, we swallow and move on - closeZombieTask(task); + firstException.compareAndSet(null, closeZombieTask(task)); it.remove(); } catch (final RuntimeException e) { log.error("Suspending {} {} failed due to the following error:", taskTypeName, task.id(), e); + firstException.compareAndSet(null, e); try { task.close(false, false); - } catch (final Exception f) { + } catch (final RuntimeException f) { log.error("After suspending failed, closing the same {} {} failed again due to the following error:", taskTypeName, task.id(), f); } - if (exception == null) { - exception = e; - } } } - return exception; + return firstException.get(); } - private void closeZombieTask(final Task task) { + private RuntimeException closeZombieTask(final Task task) { log.warn("{} {} got migrated to another thread already. Closing it as zombie.", taskTypeName, task.id()); try { task.close(false, true); - } catch (final Exception e) { + } catch (final RuntimeException e) { log.warn("Failed to close zombie {} {} due to {}; ignore and proceed.", taskTypeName, task.id(), e.getMessage()); + return e; } + return null; } boolean hasRunningTasks() { @@ -260,7 +260,10 @@ boolean maybeResumeSuspendedTask(final TaskId taskId, final Set try { task.resume(); } catch (final TaskMigratedException e) { - closeZombieTask(task); + final RuntimeException fatalException = closeZombieTask(task); + if (fatalException != null) { + throw fatalException; + } suspended.remove(taskId); throw e; } @@ -402,7 +405,10 @@ int process() { processed++; } } catch (final TaskMigratedException e) { - closeZombieTask(task); + final RuntimeException fatalException = closeZombieTask(task); + if (fatalException != null) { + throw fatalException; + } it.remove(); throw e; } catch (final RuntimeException e) { @@ -429,7 +435,10 @@ int punctuate() { punctuated++; } } catch (final TaskMigratedException e) { - closeZombieTask(task); + final RuntimeException fatalException = closeZombieTask(task); + if (fatalException != null) { + throw fatalException; + } it.remove(); throw e; } catch (final KafkaException e) { @@ -448,7 +457,10 @@ private void applyToRunningTasks(final TaskAction action) { try { action.apply(task); } catch (final TaskMigratedException e) { - closeZombieTask(task); + final RuntimeException fatalException = closeZombieTask(task); + if (fatalException != null) { + throw fatalException; + } it.remove(); if (firstException == null) { firstException = e; @@ -488,20 +500,45 @@ void closeNonAssignedSuspendedTasks(final Map> newAs } void close(final boolean clean) { - close(allTasks(), clean); - clear(); - } - - private void close(final Collection tasks, final boolean clean) { - for (final Task task : tasks) { + final AtomicReference firstException = new AtomicReference<>(null); + for (final Task task : allTasks()) { try { task.close(clean, false); - } catch (final Throwable t) { + } catch (final TaskMigratedException e) { + firstException.compareAndSet(null, closeZombieTask(task)); + } catch (final RuntimeException t) { + firstException.compareAndSet(null, t); log.error("Failed while closing {} {} due to the following error:", task.getClass().getSimpleName(), task.id(), t); + firstException.compareAndSet(null, closeUncleanIfRequired(task, clean)); } } + + clear(); + + final RuntimeException fatalException = firstException.get(); + if (fatalException != null) { + throw fatalException; + } + } + + private RuntimeException closeUncleanIfRequired(final Task task, + final boolean triedToCloseCleanlyPreviously) { + if (triedToCloseCleanlyPreviously) { + log.info("Try to close {} {} unclean.", task.getClass().getSimpleName(), task.id()); + try { + task.close(false, false); + } catch (final RuntimeException fatalException) { + log.error("Failed while closing {} {} due to the following error:", + task.getClass().getSimpleName(), + task.id(), + fatalException); + return fatalException; + } + } + + return null; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index ea7d362a858b9..234d25477a186 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -1193,7 +1193,11 @@ private void completeShutdown(final boolean cleanRun) { log.info("Shutting down"); - taskManager.shutdown(cleanRun); + try { + taskManager.shutdown(cleanRun); + } catch (final Throwable e) { + log.error("Failed to close task manager due to the following error:", e); + } try { consumer.close(); } catch (final Throwable e) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 53874252bec11..8dc477f82f434 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -206,10 +206,16 @@ void suspendTasksAndState() { } void shutdown(final boolean clean) { + final AtomicReference firstException = new AtomicReference<>(null); + log.debug("Shutting down all active tasks {}, standby tasks {}, suspended tasks {}, and suspended standby tasks {}", active.runningTaskIds(), standby.runningTaskIds(), active.previousTaskIds(), standby.previousTaskIds()); - active.close(clean); + try { + active.close(clean); + } catch (final RuntimeException fatalException) { + firstException.compareAndSet(null, fatalException); + } standby.close(clean); try { threadMetadataProvider.close(); @@ -220,6 +226,11 @@ void shutdown(final boolean clean) { restoreConsumer.assign(Collections.emptyList()); taskCreator.close(); standbyTaskCreator.close(); + + final RuntimeException fatalException = firstException.get(); + if (fatalException != null) { + throw fatalException; + } } Set suspendedActiveTaskIds() {