diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index f8f333e2a5ef4..1017ea0cba261 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -35,6 +35,7 @@ import com.codahale.metrics.jvm.{BufferPoolMetricSet, GarbageCollectorMetricSet, import com.codahale.metrics.{Gauge, MetricFilter, MetricRegistry} import com.fasterxml.jackson.databind.ObjectMapper import grizzled.slf4j.Logger +import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.flink.configuration._ import org.apache.flink.core.fs.FileSystem import org.apache.flink.core.memory.{HeapMemorySegment, HybridMemorySegment, MemorySegmentFactory, MemoryType} @@ -1335,9 +1336,15 @@ class TaskManager( runningTasks.asScala foreach { case (execID, task) => - val registry = task.getAccumulatorRegistry - val accumulators = registry.getSnapshot - accumulatorEvents.append(accumulators) + try { + val registry = task.getAccumulatorRegistry + val accumulators = registry.getSnapshot + accumulatorEvents.append(accumulators) + } catch { + case e: Exception => + log.warn("Failed to take accumulator snapshot for task {}.", + execID, ExceptionUtils.getRootCause(e)) + } } currentJobManager foreach {