Skip to content

Commit

Permalink
[FLINK-5197] [jm] Ignore outdated JobStatusChanged messages
Browse files Browse the repository at this point in the history
Outdated JobStatusChanged messages no longer trigger a RemoveJob message but are
logged and ignored. This has the advantage, that an outdated JobStatusChanged message
cannot interfere with a recovered job which can have the same job id.
  • Loading branch information
tillrohrmann committed Nov 30, 2016
1 parent 90acbe5 commit ae0975c
Showing 1 changed file with 5 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,7 @@ class JobManager(
}
}(context.dispatcher)

case JobStatusChanged(jobID, newJobStatus, timeStamp, error) =>
case msg @ JobStatusChanged(jobID, newJobStatus, timeStamp, error) =>
currentJobs.get(jobID) match {
case Some((executionGraph, jobInfo)) => executionGraph.getJobName

Expand Down Expand Up @@ -911,8 +911,7 @@ class JobManager(
}
}(context.dispatcher)
}
case None =>
self ! decorateMessage(RemoveJob(jobID, removeJobFromStateBackend = true))
case None => log.debug(s"Received $msg for nonexistent job $jobID.")
}

case ScheduleOrUpdateConsumers(jobId, partitionId) =>
Expand Down Expand Up @@ -1077,7 +1076,7 @@ class JobManager(
futuresToComplete = Some(futuresToComplete.getOrElse(Seq()) :+ futureToComplete)
case None =>
}
case None =>
case None => log.debug(s"Tried to remove nonexistent job $jobID.")
}

case RemoveCachedJob(jobID) =>
Expand Down Expand Up @@ -1711,14 +1710,14 @@ class JobManager(
// shutdown to release all resources.
submittedJobGraphs.removeJobGraph(jobID)
} catch {
case t: Throwable => log.error(s"Could not remove submitted job graph $jobID.", t)
case t: Throwable => log.warn(s"Could not remove submitted job graph $jobID.", t)
}
}(context.dispatcher))

try {
archive ! decorateMessage(ArchiveExecutionGraph(jobID, eg.archive()))
} catch {
case t: Throwable => log.error(s"Could not archive the execution graph $eg.", t)
case t: Throwable => log.warn(s"Could not archive the execution graph $eg.", t)
}

futureOption
Expand Down

0 comments on commit ae0975c

Please sign in to comment.