Skip to content

Commit

Permalink
retry rules for failed update steps (etsy#56)
Browse files Browse the repository at this point in the history
* retry rules for failed update steps

* use multi-line strings for long log lines
  • Loading branch information
mchalek authored and Aaron Niskode-Dossett committed Feb 16, 2018
1 parent 764739b commit 58aa61c
Showing 1 changed file with 45 additions and 16 deletions.
61 changes: 45 additions & 16 deletions flowtracker/src/main/scala/FlowTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import java.net.SocketException
import java.util.concurrent.atomic.AtomicBoolean
import java.util.Properties

import scala.util.Try
import scala.collection.mutable
import scala.collection.JavaConversions._

Expand Down Expand Up @@ -131,29 +132,57 @@ class FlowTracker(val flow: Flow[_],
def this(flow: Flow[_], runCompleted: AtomicBoolean) = this(flow, runCompleted, "", false)

override def run(): Unit = {
try {
Try {
initializeTrackedJobState
registerShutdownHook
}.recover {
case t: Throwable =>
LOG.warn("""
Failed to initialize FlowTracker. This is NOT a fatal error.
The job will run as normal, but it will not be tracked by Sahale.
""".stripMargin.trim, t)
runCompleted.set(true)
}

val maxFailures = 10
var numFailures = 0
while(!runCompleted.get) {
Try {
updateSteps
updateFlow
updateAggregates
if (!disableProgressBar) {
logFlowStatus
}
}.recover {
case t: Throwable if 1 + numFailures < maxFailures=>
LOG.warn(s"""
FlowTracker has thrown an exception because an update iteration
failed. This is NOT a fatal error. We will skip this update
iteration. ${maxFailures - numFailures} attempts remain.
""".stripMargin.trim, t)

numFailures += 1

case t: Throwable =>
LOG.warn("""
FlowTracker for this run has thrown an exception. This is NOT a
fatal error. The run will complete as normal, but the remainder
will not be tracked by Sahale.
""".stripMargin.trim, t)
runCompleted.set(true)
}

sleep(REFRESH_INTERVAL_MS)
}

while (!runCompleted.get) {
// Push the final updates, but only if we have not hit our failure limit
if(numFailures < maxFailures) {
Try {
updateSteps
updateFlow
updateAggregates
if (!disableProgressBar) {
logFlowStatus
}
sleep(REFRESH_INTERVAL_MS)
}
} catch {
case t: Throwable => {
LOG.warn("FlowTracker for this run has thrown an exception. " +
"The run will complete as normal, but the remainder will not be tracked.", t)
runCompleted.set(true);
}
} finally {
updateSteps
updateFlow
updateAggregates
}
}

Expand Down

0 comments on commit 58aa61c

Please sign in to comment.