Skip to content

Commit

Permalink
[SPARK-48629] Migrate the residual code to structured logging framework
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
The pr aims to migrate the `residual code` to structured logging framework.

### Why are the changes needed?
When I reviewed the spark code, I found that some logs in the some module were not fully migrated to the structured logging framework, let's to complete unfinished work.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Pass GA.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#46986 from panbingkun/sl_other_followup.

Authored-by: panbingkun <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
  • Loading branch information
panbingkun authored and gengliangwang committed Jun 25, 2024
1 parent 5112e58 commit d47f34f
Show file tree
Hide file tree
Showing 28 changed files with 118 additions and 101 deletions.
19 changes: 14 additions & 5 deletions common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ private[spark] object LogKeys {
case object APP_STATE extends LogKey
case object ARCHIVE_NAME extends LogKey
case object ARGS extends LogKey
case object ARTIFACTS extends LogKey
case object ARTIFACT_ID extends LogKey
case object ATTRIBUTE_MAP extends LogKey
case object AUTH_ENABLED extends LogKey
Expand Down Expand Up @@ -282,6 +283,7 @@ private[spark] object LogKeys {
case object FREE_MEMORY_SIZE extends LogKey
case object FROM_OFFSET extends LogKey
case object FROM_TIME extends LogKey
case object FS_DATA_OUTPUT_STREAM extends LogKey
case object FUNCTION_NAME extends LogKey
case object FUNCTION_PARAM extends LogKey
case object GLOBAL_INIT_FILE extends LogKey
Expand All @@ -299,9 +301,8 @@ private[spark] object LogKeys {
case object HIVE_OPERATION_STATE extends LogKey
case object HIVE_OPERATION_TYPE extends LogKey
case object HOST extends LogKey
case object HOSTS extends LogKey
case object HOST_LOCAL_BLOCKS_SIZE extends LogKey
case object HOST_NAME extends LogKey
case object HOST_NAMES extends LogKey
case object HOST_PORT extends LogKey
case object HOST_PORT2 extends LogKey
case object HUGE_METHOD_LIMIT extends LogKey
Expand Down Expand Up @@ -337,6 +338,7 @@ private[spark] object LogKeys {
case object KEY2 extends LogKey
case object KEYTAB extends LogKey
case object KEYTAB_FILE extends LogKey
case object KILL_EXECUTORS extends LogKey
case object LABEL_COLUMN extends LogKey
case object LARGEST_CLUSTER_INDEX extends LogKey
case object LAST_ACCESS_TIME extends LogKey
Expand All @@ -357,10 +359,10 @@ private[spark] object LogKeys {
case object LOCAL_BLOCKS_SIZE extends LogKey
case object LOCAL_SCRATCH_DIR extends LogKey
case object LOCATION extends LogKey
case object LOGICAL_PLAN extends LogKey
case object LOGICAL_PLAN_COLUMNS extends LogKey
case object LOGICAL_PLAN_LEAVES extends LogKey
case object LOG_ID extends LogKey
case object LOG_KEY_FILE extends LogKey
case object LOG_LEVEL extends LogKey
case object LOG_OFFSET extends LogKey
case object LOG_TYPE extends LogKey
Expand All @@ -385,6 +387,7 @@ private[spark] object LogKeys {
case object MAX_NUM_PARTITIONS extends LogKey
case object MAX_NUM_POSSIBLE_BINS extends LogKey
case object MAX_NUM_ROWS_IN_MEMORY_BUFFER extends LogKey
case object MAX_SERVICE_NAME_LENGTH extends LogKey
case object MAX_SIZE extends LogKey
case object MAX_SLOTS extends LogKey
case object MAX_SPLIT_BYTES extends LogKey
Expand All @@ -395,6 +398,7 @@ private[spark] object LogKeys {
case object MEMORY_THRESHOLD_SIZE extends LogKey
case object MERGE_DIR_NAME extends LogKey
case object MESSAGE extends LogKey
case object METADATA extends LogKey
case object METADATA_DIRECTORY extends LogKey
case object METADATA_JSON extends LogKey
case object META_FILE extends LogKey
Expand Down Expand Up @@ -541,7 +545,8 @@ private[spark] object LogKeys {
case object OLD_VALUE extends LogKey
case object OPEN_COST_IN_BYTES extends LogKey
case object OPERATION_HANDLE extends LogKey
case object OPERATION_HANDLE_IDENTIFIER extends LogKey
case object OPERATION_HANDLE_ID extends LogKey
case object OPERATION_ID extends LogKey
case object OPTIMIZED_PLAN_COLUMNS extends LogKey
case object OPTIMIZER_CLASS_NAME extends LogKey
case object OPTIONS extends LogKey
Expand Down Expand Up @@ -583,6 +588,7 @@ private[spark] object LogKeys {
case object POST_SCAN_FILTERS extends LogKey
case object PREDICATE extends LogKey
case object PREDICATES extends LogKey
case object PREFERRED_SERVICE_NAME extends LogKey
case object PREFIX extends LogKey
case object PRETTY_ID_STRING extends LogKey
case object PRINCIPAL extends LogKey
Expand Down Expand Up @@ -613,6 +619,7 @@ private[spark] object LogKeys {
case object RANGE extends LogKey
case object RATE_LIMIT extends LogKey
case object RATIO extends LogKey
case object RDD extends LogKey
case object RDD_CHECKPOINT_DIR extends LogKey
case object RDD_DEBUG_STRING extends LogKey
case object RDD_DESCRIPTION extends LogKey
Expand Down Expand Up @@ -641,6 +648,7 @@ private[spark] object LogKeys {
case object REMOVE_FROM_MASTER extends LogKey
case object REPORT_DETAILS extends LogKey
case object REQUESTER_SIZE extends LogKey
case object REQUEST_EXECUTORS extends LogKey
case object REQUEST_ID extends LogKey
case object RESOURCE extends LogKey
case object RESOURCE_NAME extends LogKey
Expand Down Expand Up @@ -679,6 +687,7 @@ private[spark] object LogKeys {
case object SESSION_KEY extends LogKey
case object SET_CLIENT_INFO_REQUEST extends LogKey
case object SHARD_ID extends LogKey
case object SHORTER_SERVICE_NAME extends LogKey
case object SHORT_USER_NAME extends LogKey
case object SHUFFLE_BLOCK_INFO extends LogKey
case object SHUFFLE_DB_BACKEND_KEY extends LogKey
Expand Down Expand Up @@ -756,7 +765,6 @@ private[spark] object LogKeys {
case object TASK_ATTEMPT_ID extends LogKey
case object TASK_ID extends LogKey
case object TASK_INDEX extends LogKey
case object TASK_INFO_ID extends LogKey
case object TASK_LOCALITY extends LogKey
case object TASK_NAME extends LogKey
case object TASK_REQUIREMENTS extends LogKey
Expand Down Expand Up @@ -835,6 +843,7 @@ private[spark] object LogKeys {
case object WORKER_PORT extends LogKey
case object WORKER_URL extends LogKey
case object WRITE_AHEAD_LOG_INFO extends LogKey
case object WRITE_AHEAD_LOG_RECORD_HANDLE extends LogKey
case object WRITE_JOB_UUID extends LogKey
case object XML_SCHEDULING_MODE extends LogKey
case object XSD_PATH extends LogKey
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.ivy.plugins.repository.file.FileRepository
import org.apache.ivy.plugins.resolver.{ChainResolver, FileSystemResolver, IBiblioResolver}

import org.apache.spark.SparkException
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.util.ArrayImplicits._

/** Provides utility functions to be used inside SparkSubmit. */
Expand Down Expand Up @@ -215,7 +215,7 @@ private[spark] object MavenUtils extends Logging {
if (artifactInfo.getExt == "jar") {
true
} else {
logInfo(s"Skipping non-jar dependency ${artifactInfo.getId}")
logInfo(log"Skipping non-jar dependency ${MDC(LogKeys.ARTIFACT_ID, artifactInfo.getId)}")
false
}
}
Expand Down Expand Up @@ -515,8 +515,9 @@ private[spark] object MavenUtils extends Logging {
val failedReports = rr.getArtifactsReports(DownloadStatus.FAILED, true)
if (failedReports.nonEmpty && noCacheIvySettings.isDefined) {
val failedArtifacts = failedReports.map(r => r.getArtifact)
logInfo(s"Download failed: ${failedArtifacts.mkString("[", ", ", "]")}, " +
s"attempt to retry while skipping local-m2-cache.")
logInfo(log"Download failed: " +
log"${MDC(LogKeys.ARTIFACTS, failedArtifacts.mkString("[", ", ", "]"))}, " +
log"attempt to retry while skipping local-m2-cache.")
failedArtifacts.foreach(artifact => {
clearInvalidIvyCacheFiles(artifact.getModuleRevisionId, ivySettings.getDefaultCache)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.nio.charset.StandardCharsets.UTF_8

import scala.util.control.NonFatal

import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, LogKeys, MDC}

private[spark] trait SparkErrorUtils extends Logging {
/**
Expand Down Expand Up @@ -74,7 +74,8 @@ private[spark] trait SparkErrorUtils extends Logging {
} catch {
case t: Throwable if (originalThrowable != null && originalThrowable != t) =>
originalThrowable.addSuppressed(t)
logWarning(s"Suppressing exception in finally: ${t.getMessage}", t)
logWarning(
log"Suppressing exception in finally: ${MDC(LogKeys.MESSAGE, t.getMessage)}", t)
throw originalThrowable
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import java.io.File
import java.net.{URI, URISyntaxException}
import java.nio.file.Files

import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.network.util.JavaUtils

private[spark] trait SparkFileUtils extends Logging {
Expand Down Expand Up @@ -77,12 +77,12 @@ private[spark] trait SparkFileUtils extends Logging {
// remove the check when we're sure that Files.createDirectories() would never fail silently.
Files.createDirectories(dir.toPath)
if ( !dir.exists() || !dir.isDirectory) {
logError(s"Failed to create directory " + dir)
logError(log"Failed to create directory ${MDC(LogKeys.PATH, dir)}")
}
dir.isDirectory
} catch {
case e: Exception =>
logError(s"Failed to create directory " + dir, e)
logError(log"Failed to create directory ${MDC(LogKeys.PATH, dir)}", e)
false
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,11 @@ class StreamingQueryListenerBus(sparkSession: SparkSession) extends Logging {
case t: QueryTerminatedEvent =>
listener.onQueryTerminated(t)
case _ =>
logWarning(
log"Unknown StreamingQueryListener event: " +
log"${MDC(LogKeys.EVENT, event)}")
logWarning(log"Unknown StreamingQueryListener event: ${MDC(LogKeys.EVENT, event)}")
}
} catch {
case e: Exception =>
logWarning(s"Listener $listener threw an exception", e)
logWarning(log"Listener ${MDC(LogKeys.LISTENER, listener)} threw an exception", e)
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,10 @@ class SparkConnectStreamingQueryListenerHandler(executeHolder: ExecuteHolder) ex
case StreamingQueryListenerBusCommand.CommandCase.ADD_LISTENER_BUS_LISTENER =>
listenerHolder.isServerSideListenerRegistered match {
case true =>
logWarning(
s"[SessionId: $sessionId][UserId: $userId][operationId: " +
s"${executeHolder.operationId}] Redundant server side listener added. Exiting.")
logWarning(log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}]" +
log"[UserId: ${MDC(LogKeys.USER_ID, userId)}]" +
log"[operationId: ${MDC(LogKeys.OPERATION_HANDLE_ID, executeHolder.operationId)}] " +
log"Redundant server side listener added. Exiting.")
return
case false =>
// This transfers sending back the response to the client until
Expand All @@ -86,36 +87,35 @@ class SparkConnectStreamingQueryListenerHandler(executeHolder: ExecuteHolder) ex
log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}]" +
log"[UserId: ${MDC(LogKeys.USER_ID, userId)}]" +
log"[operationId: " +
log"${MDC(LogKeys.OPERATION_HANDLE_IDENTIFIER, executeHolder.operationId)}] " +
log"${MDC(LogKeys.OPERATION_HANDLE_ID, executeHolder.operationId)}] " +
log"Error sending listener added response.",
e)
listenerHolder.cleanUp()
return
}
}
logInfo(
log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}][UserId: " +
log"${MDC(LogKeys.USER_ID, userId)}][operationId: " +
log"${MDC(LogKeys.OPERATION_HANDLE_IDENTIFIER, executeHolder.operationId)}] " +
log"Server side listener added. Now blocking until all client side listeners are " +
log"removed or there is error transmitting the event back.")
logInfo(log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}]" +
log"[UserId: ${MDC(LogKeys.USER_ID, userId)}]" +
log"[operationId: ${MDC(LogKeys.OPERATION_HANDLE_ID, executeHolder.operationId)}] " +
log"Server side listener added. Now blocking until " +
log"all client side listeners are removed or there is error transmitting the event back.")
// Block the handling thread, and have serverListener continuously send back new events
listenerHolder.streamingQueryListenerLatch.await()
logInfo(
log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}][UserId: " +
log"${MDC(LogKeys.USER_ID, userId)}]" +
log"[operationId: " +
log"${MDC(LogKeys.OPERATION_HANDLE_IDENTIFIER, executeHolder.operationId)}] " +
log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}]" +
log"[UserId: ${MDC(LogKeys.USER_ID, userId)}]" +
log"[operationId: ${MDC(LogKeys.OPERATION_HANDLE_ID, executeHolder.operationId)}] " +
log"Server side listener long-running handling thread ended.")
case StreamingQueryListenerBusCommand.CommandCase.REMOVE_LISTENER_BUS_LISTENER =>
listenerHolder.isServerSideListenerRegistered match {
case true =>
sessionHolder.streamingServersideListenerHolder.cleanUp()
case false =>
logWarning(
s"[SessionId: $sessionId][UserId: $userId][operationId: " +
s"${executeHolder.operationId}] No active server side listener bus listener " +
s"but received remove listener call. Exiting.")
logWarning(log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}]" +
log"[UserId: ${MDC(LogKeys.USER_ID, userId)}]" +
log"[operationId: ${MDC(LogKeys.OPERATION_HANDLE_ID, executeHolder.operationId)}] " +
log"No active server side listener bus listener but received remove listener call. " +
log"Exiting.")
return
}
case StreamingQueryListenerBusCommand.CommandCase.COMMAND_NOT_SET =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ private[kinesis] class KinesisCheckpointer(
}
} catch {
case NonFatal(e) =>
logWarning(s"Failed to checkpoint shardId $shardId to DynamoDB.", e)
logWarning(log"Failed to checkpoint shardId ${MDC(SHARD_ID, shardId)} to DynamoDB.", e)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1390,8 +1390,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
case _: IOException if !retried =>
// compaction may touch the file(s) which app rebuild wants to read
// compaction wouldn't run in short interval, so try again...
logWarning(s"Exception occurred while rebuilding log path ${attempt.logPath} - " +
"trying again...")
logWarning(log"Exception occurred while rebuilding log path " +
log"${MDC(LogKeys.PATH, attempt.logPath)} - trying again...")
retried = true

case e: Exception =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.deploy.DeployMessages.{DecommissionWorkersOnHosts, Maste
import org.apache.spark.deploy.Utils.addRenderLogHandler
import org.apache.spark.deploy.master.Master
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKeys.{HOST_NAMES, NUM_REMOVED_WORKERS}
import org.apache.spark.internal.LogKeys.{HOSTS, NUM_REMOVED_WORKERS}
import org.apache.spark.internal.config.DECOMMISSION_ENABLED
import org.apache.spark.internal.config.UI.MASTER_UI_DECOMMISSION_ALLOW_MODE
import org.apache.spark.internal.config.UI.UI_KILL_ENABLED
Expand Down Expand Up @@ -79,7 +79,7 @@ class MasterWebUI(
} else {
val removedWorkers = masterEndpointRef.askSync[Integer](
DecommissionWorkersOnHosts(hostnames))
logInfo(log"Decommissioning of hosts ${MDC(HOST_NAMES, hostnames)}" +
logInfo(log"Decommissioning of hosts ${MDC(HOSTS, hostnames)}" +
log" decommissioned ${MDC(NUM_REMOVED_WORKERS, removedWorkers)} workers")
if (removedWorkers > 0) {
resp.setStatus(HttpServletResponse.SC_OK)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ private[spark] class Executor(
extends Logging {

logInfo(log"Starting executor ID ${LogMDC(LogKeys.EXECUTOR_ID, executorId)}" +
log" on host ${LogMDC(HOST_NAME, executorHostname)}")
log" on host ${LogMDC(HOST, executorHostname)}")
logInfo(log"OS info ${LogMDC(OS_NAME, System.getProperty("os.name"))}," +
log" ${LogMDC(OS_VERSION, System.getProperty("os.version"))}, " +
log"${LogMDC(OS_ARCH, System.getProperty("os.arch"))}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ class AbortableStreamBasedCheckpointFileManager(path: Path, hadoopConf: Configur
fsDataOutputStream.close()
} catch {
case NonFatal(e) =>
logWarning(s"Error cancelling write to $path (stream: $fsDataOutputStream)", e)
logWarning(log"Error cancelling write to ${MDC(LogKeys.PATH, path)} " +
log"(stream: ${MDC(LogKeys.FS_DATA_OUTPUT_STREAM, fsDataOutputStream)})", e)
} finally {
terminated = true
}
Expand All @@ -71,7 +72,8 @@ class AbortableStreamBasedCheckpointFileManager(path: Path, hadoopConf: Configur
fsDataOutputStream.close()
} catch {
case NonFatal(e) =>
logWarning(s"Error closing $path (stream: $fsDataOutputStream)", e)
logWarning(log"Error closing ${MDC(LogKeys.PATH, path)} " +
log"(stream: ${MDC(LogKeys.FS_DATA_OUTPUT_STREAM, fsDataOutputStream)})", e)
} finally {
terminated = true
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.features.DriverServiceFeatureStep._
import org.apache.spark.deploy.k8s.submit._
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKeys.{CONFIG, EXECUTOR_ENV_REGEX}
import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
import org.apache.spark.util.{Clock, SystemClock, Utils}
Expand Down Expand Up @@ -107,9 +106,11 @@ class KubernetesDriverConf(
} else {
val randomServiceId = KubernetesUtils.uniqueID(clock)
val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX"
logWarning(s"Driver's hostname would preferably be $preferredServiceName, but this is " +
s"too long (must be <= $MAX_SERVICE_NAME_LENGTH characters). Falling back to use " +
s"$shorterServiceName as the driver service's name.")
logWarning(log"Driver's hostname would preferably be " +
log"${MDC(LogKeys.PREFERRED_SERVICE_NAME, preferredServiceName)}, but this is too long " +
log"(must be <= ${MDC(LogKeys.MAX_SERVICE_NAME_LENGTH, MAX_SERVICE_NAME_LENGTH)} " +
log"characters). Falling back to use " +
log"${MDC(LogKeys.SHORTER_SERVICE_NAME, shorterServiceName)} as the driver service's name.")
shorterServiceName
}
}
Expand Down Expand Up @@ -242,10 +243,10 @@ private[spark] class KubernetesExecutorConf(
if (executorEnvRegex.pattern.matcher(key).matches()) {
true
} else {
logWarning(log"Invalid key: ${MDC(CONFIG, key)}, " +
logWarning(log"Invalid key: ${MDC(LogKeys.CONFIG, key)}, " +
log"a valid environment variable name must consist of alphabetic characters, " +
log"digits, '_', '-', or '.', and must not start with a digit. " +
log"Regex used for validation is '${MDC(EXECUTOR_ENV_REGEX, executorEnvRegex)}'")
log"Regex used for validation is '${MDC(LogKeys.EXECUTOR_ENV_REGEX, executorEnvRegex)}'")
false
}
}
Expand Down
Loading

0 comments on commit d47f34f

Please sign in to comment.