Skip to content

Commit

Permalink
[SPARK-15990][YARN] Add rolling log aggregation support for Spark on …
Browse files Browse the repository at this point in the history
…yarn

## What changes were proposed in this pull request?

Yarn supports rolling log aggregation since 2.6, previously log will only be aggregated to HDFS after application is finished, it is quite painful for long running applications like Spark Streaming, thriftserver. Also out of disk problem will be occurred when log file is too large. So here propose to add support of rolling log aggregation for Spark on yarn.

One limitation for this is that log4j should be set to change to file appender, now in Spark itself uses console appender by default, in which file will not be created again once removed after aggregation. But I think lots of production users should have changed their log4j configuration instead of default on, so this is not a big problem.

## How was this patch tested?

Manually verified with Hadoop 2.7.1.

Author: jerryshao <[email protected]>

Closes apache#13712 from jerryshao/SPARK-15990.
  • Loading branch information
jerryshao authored and Tom Graves committed Jun 29, 2016
1 parent 393db65 commit 272a2f7
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 0 deletions.
24 changes: 24 additions & 0 deletions docs/running-on-yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,30 @@ To use a custom metrics.properties for the application master and executors, upd
Currently supported services are: <code>hive</code>, <code>hbase</code>
</td>
</tr>
<tr>
<td><code>spark.yarn.rolledLog.includePattern</code></td>
<td>(none)</td>
<td>
Java Regex to filter the log files which match the defined include pattern
and those log files will be aggregated in a rolling fashion.
This will be used with YARN's rolling log aggregation, to enable this feature in YARN side
<code>yarn.nodemanager.log-aggregation.roll-monitoring-interval-seconds</code> should be
configured in yarn-site.xml.
This feature can only be used with Hadoop 2.6.1+. The Spark log4j appender needs be changed to use
FileAppender or another appender that can handle the files being removed while its running. Based
on the file name configured in the log4j configuration (like spark.log), the user should set the
regex (spark*) to include all the log files that need to be aggregated.
</td>
</tr>
<tr>
<td><code>spark.yarn.rolledLog.excludePattern</code></td>
<td>(none)</td>
<td>
Java Regex to filter the log files which match the defined exclude pattern
and those log files will not be aggregated in a rolling fashion. If the log file
name matches both the include and the exclude pattern, this file will be excluded eventually.
</td>
</tr>
</table>

# Important notes
Expand Down
27 changes: 27 additions & 0 deletions yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,33 @@ private[spark] class Client(
appContext.setResource(capability)
}

sparkConf.get(ROLLED_LOG_INCLUDE_PATTERN).foreach { includePattern =>
try {
val logAggregationContext = Records.newRecord(
Utils.classForName("org.apache.hadoop.yarn.api.records.LogAggregationContext"))
.asInstanceOf[Object]

val setRolledLogsIncludePatternMethod =
logAggregationContext.getClass.getMethod("setRolledLogsIncludePattern", classOf[String])
setRolledLogsIncludePatternMethod.invoke(logAggregationContext, includePattern)

sparkConf.get(ROLLED_LOG_EXCLUDE_PATTERN).foreach { excludePattern =>
val setRolledLogsExcludePatternMethod =
logAggregationContext.getClass.getMethod("setRolledLogsExcludePattern", classOf[String])
setRolledLogsExcludePatternMethod.invoke(logAggregationContext, excludePattern)
}

val setLogAggregationContextMethod =
appContext.getClass.getMethod("setLogAggregationContext",
Utils.classForName("org.apache.hadoop.yarn.api.records.LogAggregationContext"))
setLogAggregationContextMethod.invoke(appContext, logAggregationContext)
} catch {
case NonFatal(e) =>
logWarning(s"Ignoring ${ROLLED_LOG_INCLUDE_PATTERN.key} because the version of YARN " +
s"does not support it", e)
}
}

appContext
}

Expand Down
16 changes: 16 additions & 0 deletions yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,22 @@ package object config {
.toSequence
.createWithDefault(Nil)

/* Rolled log aggregation configuration. */

private[spark] val ROLLED_LOG_INCLUDE_PATTERN =
ConfigBuilder("spark.yarn.rolledLog.includePattern")
.doc("Java Regex to filter the log files which match the defined include pattern and those " +
"log files will be aggregated in a rolling fashion.")
.stringConf
.createOptional

private[spark] val ROLLED_LOG_EXCLUDE_PATTERN =
ConfigBuilder("spark.yarn.rolledLog.excludePattern")
.doc("Java Regex to filter the log files which match the defined exclude pattern and those " +
"log files will not be aggregated in a rolling fashion.")
.stringConf
.createOptional

/* Private configs. */

private[spark] val CREDENTIALS_FILE_PATH = ConfigBuilder("spark.yarn.credentials.file")
Expand Down

0 comments on commit 272a2f7

Please sign in to comment.