Skip to content

Commit

Permalink
[SPARK-6305][CORE][TEST][FOLLOWUP] Add LoggingSuite and some improvem…
Browse files Browse the repository at this point in the history
…ents

### What changes were proposed in this pull request?

This patch proposes to add `LoggingSuite` back and also does some other improvements. In summary:

1. Add `LoggingSuite` back
2. Refactor logging related change based on community suggestion, e.g. let `SparkShellLoggingFilter` inherit from `AbstractFilter` instead of `Filter`.
3. Fix maven test failures for hive-thriftserver module
4. Fix K8S decommision integration tests which check log output
5. A few places in code/doc which refer/mention log4j.properties

### Why are the changes needed?

`LoggingSuite` was wrongly removed in previous PR. We should add it back. There are a few places we can also simplify the code. A few places in code which programmingly write out log4j properties files are also changed to log4j2 here.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Pass all tests.

Closes apache#34965 from viirya/log4j2_improvement.

Authored-by: Liang-Chi Hsieh <[email protected]>
Signed-off-by: Liang-Chi Hsieh <[email protected]>
  • Loading branch information
viirya committed Dec 24, 2021
1 parent dd0decf commit 7fd3619
Show file tree
Hide file tree
Showing 17 changed files with 324 additions and 328 deletions.
19 changes: 11 additions & 8 deletions R/log4j.properties → R/log4j2.properties
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@
#

# Set everything to be logged to the file target/unit-tests.log
log4j.rootCategory=INFO, file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=true
log4j.appender.file.file=R/target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
rootLogger.level = info
rootLogger.appenderRef.file.ref = File

appender.file.type = File
appender.file.name = File
appender.file.fileName = target/unit-tests.log
appender.file.append = true
appender.file.layout.type = PatternLayout
appender.file.layout.pattern = %d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n

# Ignore messages below warning level from Jetty, because it's a bit verbose
log4j.logger.org.eclipse.jetty=WARN
org.eclipse.jetty.LEVEL=WARN
logger.jetty.name = org.eclipse.jetty
logger.jetty.level = warn
4 changes: 2 additions & 2 deletions R/run-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ if [[ $(echo $SPARK_AVRO_JAR_PATH | wc -l) -eq 1 ]]; then
fi

if [ -z "$SPARK_JARS" ]; then
SPARK_TESTING=1 NOT_CRAN=true $FWDIR/../bin/spark-submit --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" --conf spark.hadoop.fs.defaultFS="file:///" --conf spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" --conf spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE
SPARK_TESTING=1 NOT_CRAN=true $FWDIR/../bin/spark-submit --driver-java-options "-Dlog4j.configurationFile=file:$FWDIR/log4j2.properties" --conf spark.hadoop.fs.defaultFS="file:///" --conf spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" --conf spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE
else
SPARK_TESTING=1 NOT_CRAN=true $FWDIR/../bin/spark-submit --jars $SPARK_JARS --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" --conf spark.hadoop.fs.defaultFS="file:///" --conf spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" --conf spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE
SPARK_TESTING=1 NOT_CRAN=true $FWDIR/../bin/spark-submit --jars $SPARK_JARS --driver-java-options "-Dlog4j.configurationFile=file:$FWDIR/log4j2.properties" --conf spark.hadoop.fs.defaultFS="file:///" --conf spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" --conf spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE
fi

FAILED=$((PIPESTATUS[0]||$FAILED))
Expand Down
43 changes: 0 additions & 43 deletions core/src/main/resources/org/apache/spark/log4j-defaults.properties

This file was deleted.

5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import scala.concurrent.ExecutionContext
import scala.reflect.ClassTag
import scala.util.{Failure, Success}

import org.apache.log4j.Logger
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.core.Logger

import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.deploy.DeployMessages._
Expand Down Expand Up @@ -282,7 +283,7 @@ private[spark] class ClientApp extends SparkApplication {
if (!conf.contains(RPC_ASK_TIMEOUT)) {
conf.set(RPC_ASK_TIMEOUT, "10s")
}
Logger.getRootLogger.setLevel(driverArgs.logLevel)
LogManager.getRootLogger.asInstanceOf[Logger].setLevel(driverArgs.logLevel)

val rpcEnv =
RpcEnv.create("driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.net.{URI, URISyntaxException}
import scala.annotation.tailrec
import scala.collection.mutable.ListBuffer

import org.apache.log4j.Level
import org.apache.logging.log4j.Level

import org.apache.spark.util.{IntParam, MemoryParam, Utils}

Expand Down
85 changes: 9 additions & 76 deletions core/src/main/scala/org/apache/spark/internal/Logging.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ package org.apache.spark.internal

import scala.collection.JavaConverters._

import org.apache.logging.log4j.{core, Level, LogManager, Marker}
import org.apache.logging.log4j.{Level, LogManager}
import org.apache.logging.log4j.core.{Filter, LifeCycle, LogEvent, LoggerContext}
import org.apache.logging.log4j.core.appender.ConsoleAppender
import org.apache.logging.log4j.message.Message
import org.apache.logging.log4j.core.filter.AbstractFilter
import org.slf4j.{Logger, LoggerFactory}
import org.slf4j.impl.StaticLoggerBinder

Expand Down Expand Up @@ -125,12 +125,12 @@ trait Logging {
}

private def initializeLogging(isInterpreter: Boolean, silent: Boolean): Unit = {
if (!Logging.isLog4j12()) {
if (Logging.isLog4j2()) {
// If Log4j is used but is not initialized, load a default properties file
val log4j12Initialized = !LogManager.getRootLogger
val log4j2Initialized = !LogManager.getRootLogger
.asInstanceOf[org.apache.logging.log4j.core.Logger].getAppenders.isEmpty
// scalastyle:off println
if (!log4j12Initialized) {
if (!log4j2Initialized) {
Logging.defaultSparkLog4jConfig = true
val defaultLogProps = "org/apache/spark/log4j2-defaults.properties"
Option(Utils.getSparkClassLoader.getResource(defaultLogProps)) match {
Expand Down Expand Up @@ -209,7 +209,7 @@ private[spark] object Logging {
* initialization again.
*/
def uninitialize(): Unit = initLock.synchronized {
if (!isLog4j12()) {
if (isLog4j2()) {
if (defaultSparkLog4jConfig) {
defaultSparkLog4jConfig = false
val context = LogManager.getContext(false).asInstanceOf[LoggerContext]
Expand All @@ -224,85 +224,18 @@ private[spark] object Logging {
this.initialized = false
}

private def isLog4j12(): Boolean = {
private def isLog4j2(): Boolean = {
// This distinguishes the log4j 1.2 binding, currently
// org.slf4j.impl.Log4jLoggerFactory, from the log4j 2.0 binding, currently
// org.apache.logging.slf4j.Log4jLoggerFactory
val binderClass = StaticLoggerBinder.getSingleton.getLoggerFactoryClassStr
"org.slf4j.impl.Log4jLoggerFactory".equals(binderClass)
"org.apache.logging.slf4j.Log4jLoggerFactory".equals(binderClass)
}


private class SparkShellLoggingFilter extends Filter {
private[spark] class SparkShellLoggingFilter extends AbstractFilter {
private var status = LifeCycle.State.INITIALIZING

override def getOnMismatch: Filter.Result = Filter.Result.ACCEPT

override def getOnMatch: Filter.Result = Filter.Result.ACCEPT

// We don't use this with log4j2 `Marker`, currently all accept.
// If we need it, we should implement it.
override def filter(logger: core.Logger,
level: Level, marker: Marker, msg: String, params: Object*): Filter.Result =
Filter.Result.ACCEPT

override def filter(logger: core.Logger,
level: Level, marker: Marker, message: String, p0: Object): Filter.Result =
Filter.Result.ACCEPT

override def filter(logger: core.Logger,
level: Level, marker: Marker, message: String, p0: Object, p1: Object): Filter.Result =
Filter.Result.ACCEPT

override def filter(logger: core.Logger,
level: Level, marker: Marker, message: String, p0: Object, p1: Object,
p2: Object): Filter.Result = Filter.Result.ACCEPT

override def filter(logger: core.Logger,
level: Level, marker: Marker, message: String, p0: Object, p1: Object,
p2: Object, p3: Object): Filter.Result = Filter.Result.ACCEPT

override def filter(logger: core.Logger,
level: Level, marker: Marker, message: String, p0: Object, p1: Object,
p2: Any, p3: Any, p4: Any): Filter.Result = Filter.Result.ACCEPT

override def filter(logger: core.Logger,
level: Level, marker: Marker, message: String, p0: Object, p1: Object,
p2: Object, p3: Object, p4: Object, p5: Object): Filter.Result =
Filter.Result.ACCEPT

// scalastyle:off
override def filter(logger: core.Logger,
level: Level, marker: Marker, message: String, p0: Object, p1: Object,
p2: Object, p3: Object, p4: Object, p5: Object, p6: Object): Filter.Result =
Filter.Result.ACCEPT

override def filter(logger: core.Logger,
level: Level, marker: Marker, message: String, p0: Object, p1: Object,
p2: Object, p3: Object, p4: Object, p5: Object, p6: Object, p7: Object): Filter.Result =
Filter.Result.ACCEPT

override def filter(logger: core.Logger,
level: Level, marker: Marker, message: String, p0: Object, p1: Object,
p2: Object, p3: Object, p4: Object, p5: Object, p6: Object, p7: Object,
p8: Object): Filter.Result =
Filter.Result.ACCEPT

override def filter(logger: core.Logger,
level: Level, marker: Marker, message: String, p0: Object, p1: Object,
p2: Object, p3: Object, p4: Object, p5: Object, p6: Object, p7: Object,
p8: Object, p9: Object): Filter.Result =
Filter.Result.ACCEPT
// scalastyle:on

override def filter(logger: core.Logger,
level: Level, marker: Marker, msg: Any, t: Throwable): Filter.Result =
Filter.Result.ACCEPT

override def filter(logger: core.Logger,
level: Level, marker: Marker, msg: Message, t: Throwable): Filter.Result =
Filter.Result.ACCEPT

/**
* If sparkShellThresholdLevel is not defined, this filter is a no-op.
* If log level of event is not equal to root level, the event is allowed. Otherwise,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ private[spark] class DriverLogger(conf: SparkConf) extends Logging {
val logger = LogManager.getRootLogger().asInstanceOf[Logger]
val fa = logger.getAppenders.get(DriverLogger.APPENDER_NAME)
logger.removeAppender(fa)
fa.stop()
Utils.tryLogNonFatalError(fa.stop())
writer.foreach(_.closeWriter())
} catch {
Expand Down
25 changes: 11 additions & 14 deletions core/src/test/scala/org/apache/spark/SparkFunSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -242,17 +242,13 @@ abstract class SparkFunSuite
throw new SparkException(s"Cannot get any logger to add the appender")
}
val restoreLevels = loggers.map(_.getLevel)
loggers.foreach { logger =>
logger match {
case logger: Logger =>
logger.addAppender(appender)
appender.start()
if (level.isDefined) {
logger.setLevel(level.get)
logger.get().setLevel(level.get)
}
case _ =>
throw new SparkException(s"Cannot add appender to logger ${logger.getName}")
loggers.foreach { l =>
val logger = l.asInstanceOf[Logger]
logger.addAppender(appender)
appender.start()
if (level.isDefined) {
logger.setLevel(level.get)
logger.get().setLevel(level.get)
}
}
try f finally {
Expand All @@ -272,14 +268,15 @@ abstract class SparkFunSuite
val loggingEvents = new ArrayBuffer[LogEvent]()
private var _threshold: Level = Level.INFO

override def append(loggingEvent: LogEvent): Unit = {
if (loggingEvent.getLevel.isMoreSpecificThan(_threshold)) {
override def append(loggingEvent: LogEvent): Unit = loggingEvent.synchronized {
val copyEvent = loggingEvent.toImmutable
if (copyEvent.getLevel.isMoreSpecificThan(_threshold)) {
if (loggingEvents.size >= maxEvents) {
val loggingInfo = if (msg == "") "." else s" while logging $msg."
throw new IllegalStateException(
s"Number of events reached the limit of $maxEvents$loggingInfo")
}
loggingEvents.append(loggingEvent.toImmutable)
loggingEvents.append(copyEvent)
}
}

Expand Down
64 changes: 64 additions & 0 deletions core/src/test/scala/org/apache/spark/internal/LoggingSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.internal

import org.apache.logging.log4j.{Level, LogManager}
import org.apache.logging.log4j.core.{Filter, Logger}
import org.apache.logging.log4j.core.impl.Log4jLogEvent.Builder
import org.apache.logging.log4j.message.SimpleMessage

import org.apache.spark.SparkFunSuite
import org.apache.spark.internal.Logging.SparkShellLoggingFilter
import org.apache.spark.util.Utils

class LoggingSuite extends SparkFunSuite {

test("spark-shell logging filter") {
val ssf = new SparkShellLoggingFilter()
val rootLogger = LogManager.getRootLogger().asInstanceOf[Logger]
val originalLevel = rootLogger.getLevel()
rootLogger.setLevel(Level.INFO)
val originalThreshold = Logging.sparkShellThresholdLevel
Logging.sparkShellThresholdLevel = Level.WARN
try {
val logger1 = LogManager.getLogger("a.b.c.D")
.asInstanceOf[Logger]
val logEvent1 = new Builder().setLevel(Level.INFO)
.setLoggerName(logger1.getName()).setMessage(new SimpleMessage("Test")).build()
// Logger's default level is not null in log4j2, and cannot be set to null too.
assert(ssf.filter(logEvent1) == Filter.Result.NEUTRAL)

// custom log level configured
val parentLogger = LogManager.getLogger("a.b.c")
.asInstanceOf[Logger]
parentLogger.setLevel(Level.INFO)
assert(ssf.filter(logEvent1) == Filter.Result.NEUTRAL)

// log level is greater than or equal to threshold level
val logger2 = LogManager.getLogger("a.b.E")
.asInstanceOf[Logger]
val logEvent2 = new Builder().setLevel(Level.INFO)
.setLoggerName(logger2.getName()).setMessage(new SimpleMessage("Test")).build()
Utils.setLogLevel(Level.INFO)
assert(ssf.filter(logEvent2) != Filter.Result.DENY)
} finally {
rootLogger.setLevel(originalLevel)
Logging.sparkShellThresholdLevel = originalThreshold
}
}
}
8 changes: 4 additions & 4 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ Spark provides three locations to configure the system:
system properties.
* [Environment variables](#environment-variables) can be used to set per-machine settings, such as
the IP address, through the `conf/spark-env.sh` script on each node.
* [Logging](#configuring-logging) can be configured through `log4j.properties`.
* [Logging](#configuring-logging) can be configured through `log4j2.properties`.

# Spark Properties

Expand Down Expand Up @@ -422,7 +422,7 @@ of the most common options to set are:
<td>%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n</td>
<td>
The layout for the driver logs that are synced to <code>spark.driver.log.dfsDir</code>. If this is not configured,
it uses the layout for the first appender defined in log4j.properties. If that is also not configured, driver logs
it uses the layout for the first appender defined in log4j2.properties. If that is also not configured, driver logs
use the default layout.
</td>
<td>3.0.0</td>
Expand Down Expand Up @@ -3080,7 +3080,7 @@ Note: When running Spark on YARN in `cluster` mode, environment variables need t
# Configuring Logging

Spark uses [log4j](http://logging.apache.org/log4j/) for logging. You can configure it by adding a
`log4j.properties` file in the `conf` directory. One way to start is to copy the existing
`log4j2.properties` file in the `conf` directory. One way to start is to copy the existing
`log4j2.properties.template` located there.

By default, Spark adds 1 record to the MDC (Mapped Diagnostic Context): `mdc.taskName`, which shows something
Expand All @@ -3092,7 +3092,7 @@ The key in MDC will be the string of "mdc.$name".
# Overriding configuration directory

To specify a different configuration directory other than the default "SPARK_HOME/conf",
you can set SPARK_CONF_DIR. Spark will use the configuration files (spark-defaults.conf, spark-env.sh, log4j.properties, etc)
you can set SPARK_CONF_DIR. Spark will use the configuration files (spark-defaults.conf, spark-env.sh, log4j2.properties, etc)
from this directory.

# Inheriting Hadoop Cluster Configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,6 @@ properties+=(
-Dspark.kubernetes.test.jvmImage=$JVM_IMAGE_NAME
-Dspark.kubernetes.test.pythonImage=$PYTHON_IMAGE_NAME
-Dspark.kubernetes.test.rImage=$R_IMAGE_NAME
-Dlog4j.logger.org.apache.spark=DEBUG
)

(
Expand Down
Loading

0 comments on commit 7fd3619

Please sign in to comment.