Skip to content

Commit

Permalink
[SPARK-2458] Make failed application log visible on History Server
Browse files Browse the repository at this point in the history
Enabled HistoryServer to show incomplete applications.
We can see the log for incomplete applications by clicking the bottom link.

Author: Masayoshi TSUZUKI <[email protected]>

Closes apache#3467 from tsudukim/feature/SPARK-2458-2 and squashes the following commits:

76205d2 [Masayoshi TSUZUKI] Fixed and added test code.
29a04a9 [Masayoshi TSUZUKI] Merge branch 'master' of github.com:tsudukim/spark into feature/SPARK-2458-2
f9ef854 [Masayoshi TSUZUKI] Added space between "if" and "(". Fixed "Incomplete" as capitalized in the web UI. Modified double negative variable name.
9b465b0 [Masayoshi TSUZUKI] Modified typo and better implementation.
3ed8a41 [Masayoshi TSUZUKI] Modified too long lines.
08ea14d [Masayoshi TSUZUKI] [SPARK-2458] Make failed application log visible on History Server
  • Loading branch information
tsudukim authored and Andrew Or committed Jan 7, 2015
1 parent 8fdd489 commit 6e74ede
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ private[spark] case class ApplicationHistoryInfo(
startTime: Long,
endTime: Long,
lastUpdated: Long,
sparkUser: String)
sparkUser: String,
completed: Boolean = false)

private[spark] abstract class ApplicationHistoryProvider {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,20 +173,9 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
val logInfos = statusList
.filter { entry =>
try {
val isFinishedApplication =
if (isLegacyLogDirectory(entry)) {
fs.exists(new Path(entry.getPath(), APPLICATION_COMPLETE))
} else {
!entry.getPath().getName().endsWith(EventLoggingListener.IN_PROGRESS)
}

if (isFinishedApplication) {
val modTime = getModificationTime(entry)
newLastModifiedTime = math.max(newLastModifiedTime, modTime)
modTime >= lastModifiedTime
} else {
false
}
val modTime = getModificationTime(entry)
newLastModifiedTime = math.max(newLastModifiedTime, modTime)
modTime >= lastModifiedTime
} catch {
case e: AccessControlException =>
// Do not use "logInfo" since these messages can get pretty noisy if printed on
Expand All @@ -204,7 +193,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
None
}
}
.sortBy { info => -info.endTime }
.sortBy { info => (-info.endTime, -info.startTime) }

lastModifiedTime = newLastModifiedTime

Expand Down Expand Up @@ -261,7 +250,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
appListener.startTime.getOrElse(-1L),
appListener.endTime.getOrElse(-1L),
getModificationTime(eventLog),
appListener.sparkUser.getOrElse(NOT_STARTED))
appListener.sparkUser.getOrElse(NOT_STARTED),
isApplicationCompleted(eventLog))
} finally {
logInput.close()
}
Expand Down Expand Up @@ -329,6 +319,17 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
/** Returns the system's mononotically increasing time. */
private def getMonotonicTimeMs(): Long = System.nanoTime() / (1000 * 1000)

/**
* Return true when the application has completed.
*/
private def isApplicationCompleted(entry: FileStatus): Boolean = {
if (isLegacyLogDirectory(entry)) {
fs.exists(new Path(entry.getPath(), APPLICATION_COMPLETE))
} else {
!entry.getPath().getName().endsWith(EventLoggingListener.IN_PROGRESS)
}
}

}

private object FsHistoryProvider {
Expand All @@ -342,5 +343,6 @@ private class FsApplicationHistoryInfo(
startTime: Long,
endTime: Long,
lastUpdated: Long,
sparkUser: String)
extends ApplicationHistoryInfo(id, name, startTime, endTime, lastUpdated, sparkUser)
sparkUser: String,
completed: Boolean = true)
extends ApplicationHistoryInfo(id, name, startTime, endTime, lastUpdated, sparkUser, completed)
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
def render(request: HttpServletRequest): Seq[Node] = {
val requestedPage = Option(request.getParameter("page")).getOrElse("1").toInt
val requestedFirst = (requestedPage - 1) * pageSize
val requestedIncomplete =
Option(request.getParameter("showIncomplete")).getOrElse("false").toBoolean

val allApps = parent.getApplicationList()
val allApps = parent.getApplicationList().filter(_.completed != requestedIncomplete)
val actualFirst = if (requestedFirst < allApps.size) requestedFirst else 0
val apps = allApps.slice(actualFirst, Math.min(actualFirst + pageSize, allApps.size))

Expand Down Expand Up @@ -65,25 +67,26 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {

<h4>
Showing {actualFirst + 1}-{last + 1} of {allApps.size}
<span style="float: right">
{
if (actualPage > 1) {
<a href={"/?page=" + (actualPage - 1)}>&lt; </a>
<a href={"/?page=1"}>1</a>
}
{if (requestedIncomplete) "(Incomplete applications)"}
<span style="float: right">
{
if (actualPage > 1) {
<a href={makePageLink(actualPage - 1, requestedIncomplete)}>&lt; </a>
<a href={makePageLink(1, requestedIncomplete)}>1</a>
}
{if (actualPage - plusOrMinus > secondPageFromLeft) " ... "}
{leftSideIndices}
{actualPage}
{rightSideIndices}
{if (actualPage + plusOrMinus < secondPageFromRight) " ... "}
{
if (actualPage < pageCount) {
<a href={"/?page=" + pageCount}>{pageCount}</a>
<a href={"/?page=" + (actualPage + 1)}> &gt;</a>
}
}
{if (actualPage - plusOrMinus > secondPageFromLeft) " ... "}
{leftSideIndices}
{actualPage}
{rightSideIndices}
{if (actualPage + plusOrMinus < secondPageFromRight) " ... "}
{
if (actualPage < pageCount) {
<a href={makePageLink(pageCount, requestedIncomplete)}>{pageCount}</a>
<a href={makePageLink(actualPage + 1, requestedIncomplete)}> &gt;</a>
}
</span>
}
</span>
</h4> ++
appTable
} else {
Expand All @@ -96,6 +99,15 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
</p>
}
}
<a href={makePageLink(actualPage, !requestedIncomplete)}>
{
if (requestedIncomplete) {
"Back to completed applications"
} else {
"Show incomplete applications"
}
}
</a>
</div>
</div>
UIUtils.basicSparkPage(content, "History Server")
Expand All @@ -117,8 +129,9 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
private def appRow(info: ApplicationHistoryInfo): Seq[Node] = {
val uiAddress = HistoryServer.UI_PATH_PREFIX + s"/${info.id}"
val startTime = UIUtils.formatDate(info.startTime)
val endTime = UIUtils.formatDate(info.endTime)
val duration = UIUtils.formatDuration(info.endTime - info.startTime)
val endTime = if (info.endTime > 0) UIUtils.formatDate(info.endTime) else "-"
val duration =
if (info.endTime > 0) UIUtils.formatDuration(info.endTime - info.startTime) else "-"
val lastUpdated = UIUtils.formatDate(info.lastUpdated)
<tr>
<td><a href={uiAddress}>{info.id}</a></td>
Expand All @@ -130,4 +143,11 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
<td sorttable_customkey={info.lastUpdated.toString}>{lastUpdated}</td>
</tr>
}

private def makePageLink(linkPage: Int, showIncomplete: Boolean): String = {
"/?" + Array(
"page=" + linkPage,
"showIncomplete=" + showIncomplete
).mkString("&")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
)

// Write an unfinished app, new-style.
writeFile(new File(testDir, "new2" + EventLoggingListener.IN_PROGRESS), true, None,
val logFile2 = new File(testDir, "new2" + EventLoggingListener.IN_PROGRESS)
writeFile(logFile2, true, None,
SparkListenerApplicationStart("app2-2", None, 1L, "test")
)

Expand Down Expand Up @@ -92,12 +93,17 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers

val list = provider.getListing().toSeq
list should not be (null)
list.size should be (2)
list.size should be (4)
list.count(e => e.completed) should be (2)

list(0) should be (ApplicationHistoryInfo(oldLog.getName(), "app3", 2L, 3L,
oldLog.lastModified(), "test"))
oldLog.lastModified(), "test", true))
list(1) should be (ApplicationHistoryInfo(logFile1.getName(), "app1-1", 1L, 2L,
logFile1.lastModified(), "test"))
logFile1.lastModified(), "test", true))
list(2) should be (ApplicationHistoryInfo(oldLog2.getName(), "app4", 2L, -1L,
oldLog2.lastModified(), "test", false))
list(3) should be (ApplicationHistoryInfo(logFile2.getName(), "app2-2", 1L, -1L,
logFile2.lastModified(), "test", false))

// Make sure the UI can be rendered.
list.foreach { case info =>
Expand Down

0 comments on commit 6e74ede

Please sign in to comment.