Skip to content

Commit

Permalink
[SPARK-48890][CORE][SS] Add Structured Streaming related fields to lo…
Browse files Browse the repository at this point in the history
…g4j ThreadContext

### What changes were proposed in this pull request?

There are some special informations needed for structured streaming queries. Specifically, each query has a query_id and run_id. Also if using MicrobatchExecution (default), there is a batch_id.

A (query_id, run_id, batch_id) identifies the microbatch the streaming query runs. Adding these field to a threadContext would help especially when there are multiple queries running.

### Why are the changes needed?

Logging improvement

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Run a streaming query through spark-submit, here are sample logs (search for query_id, run_id, or batch_id):

```
{"ts":"2024-07-15T19:56:01.577Z","level":"INFO","msg":"Starting new streaming query.","context":{"query_id":"094ebe4a-30a3-4541-90af-ca238e4e6697","run_id":"67b161c5-83e5-430a-a905-04815a0002f4"},"logger":"MicroBatchExecution"}
{"ts":"2024-07-15T19:56:01.579Z","level":"INFO","msg":"Stream started from {}","context":{"query_id":"094ebe4a-30a3-4541-90af-ca238e4e6697","run_id":"67b161c5-83e5-430a-a905-04815a0002f4","streaming_offsets_start":"{}"},"logger":"MicroBatchExecution"}
{"ts":"2024-07-15T19:56:01.602Z","level":"INFO","msg":"Writing atomically to file:/private/var/folders/9k/pbxb4_690wv4smwhwbzwmqkw0000gp/T/temporary-037d26ae-0d6f-4771-9de3-d028730520e0/offsets/0 using temp file file:/private/var/folders/9k/pbxb4_690wv4smwhwbzwmqkw0000gp/T/temporary-037d26ae-0d6f-4771-9de3-d028730520e0/offsets/.0.566e3ae0-a15e-438c-82c1-26cc109746b3.tmp","context":{"final_path":"file:/private/var/folders/9k/pbxb4_690wv4smwhwbzwmqkw0000gp/T/temporary-037d26ae-0d6f-4771-9de3-d028730520e0/offsets/0","query_id":"094ebe4a-30a3-4541-90af-ca238e4e6697","run_id":"67b161c5-83e5-430a-a905-04815a0002f4","temp_path":"file:/private/var/folders/9k/pbxb4_690wv4smwhwbzwmqkw0000gp/T/temporary-037d26ae-0d6f-4771-9de3-d028730520e0/offsets/.0.566e3ae0-a15e-438c-82c1-26cc109746b3.tmp"},"logger":"CheckpointFileManager"}
{"ts":"2024-07-15T19:56:01.675Z","level":"INFO","msg":"Renamed temp file file:/private/var/folders/9k/pbxb4_690wv4smwhwbzwmqkw0000gp/T/temporary-037d26ae-0d6f-4771-9de3-d028730520e0/offsets/.0.566e3ae0-a15e-438c-82c1-26cc109746b3.tmp to file:/private/var/folders/9k/pbxb4_690wv4smwhwbzwmqkw0000gp/T/temporary-037d26ae-0d6f-4771-9de3-d028730520e0/offsets/0","context":{"final_path":"file:/private/var/folders/9k/pbxb4_690wv4smwhwbzwmqkw0000gp/T/temporary-037d26ae-0d6f-4771-9de3-d028730520e0/offsets/0","query_id":"094ebe4a-30a3-4541-90af-ca238e4e6697","run_id":"67b161c5-83e5-430a-a905-04815a0002f4","temp_path":"file:/private/var/folders/9k/pbxb4_690wv4smwhwbzwmqkw0000gp/T/temporary-037d26ae-0d6f-4771-9de3-d028730520e0/offsets/.0.566e3ae0-a15e-438c-82c1-26cc109746b3.tmp"},"logger":"CheckpointFileManager"}
{"ts":"2024-07-15T19:56:01.676Z","level":"INFO","msg":"Committed offsets for batch 0. Metadata OffsetSeqMetadata(0,1721073361582,HashMap(spark.sql.streaming.stateStore.providerClass -> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider, spark.sql.streaming.stateStore.rocksdb.formatVersion -> 5, spark.sql.streaming.statefulOperator.useStrictDistribution -> true, spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion -> 2, spark.sql.streaming.multipleWatermarkPolicy -> min, spark.sql.streaming.aggregation.stateFormatVersion -> 2, spark.sql.shuffle.partitions -> 200, spark.sql.streaming.join.stateFormatVersion -> 2, spark.sql.streaming.stateStore.compression.codec -> lz4))","context":{"batch_id":"0","offset_sequence_metadata":"OffsetSeqMetadata(0,1721073361582,HashMap(spark.sql.streaming.stateStore.providerClass -> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider, spark.sql.streaming.stateStore.rocksdb.formatVersion -> 5, spark.sql.streaming.statefulOperator.useStrictDistribution -> true, spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion -> 2, spark.sql.streaming.multipleWatermarkPolicy -> min, spark.sql.streaming.aggregation.stateFormatVersion -> 2, spark.sql.shuffle.partitions -> 200, spark.sql.streaming.join.stateFormatVersion -> 2, spark.sql.streaming.stateStore.compression.codec -> lz4))","query_id":"094ebe4a-30a3-4541-90af-ca238e4e6697","run_id":"67b161c5-83e5-430a-a905-04815a0002f4"},"logger":"MicroBatchExecution"}
{"ts":"2024-07-15T19:56:02.074Z","level":"INFO","msg":"Code generated in 97.122375 ms","context":{"batch_id":"0","query_id":"094ebe4a-30a3-4541-90af-ca238e4e6697","run_id":"67b161c5-83e5-430a-a905-04815a0002f4","total_time":"97.122375"},"logger":"CodeGenerator"}
{"ts":"2024-07-15T19:56:02.125Z","level":"INFO","msg":"Start processing data source write support: MicroBatchWrite[epoch: 0, writer: org.apache.spark.sql.execution.datasources.noop.NoopStreamingWrite$20ba1e29]. The input RDD has 1} partitions.","context":{"batch_id":"0","batch_write":"MicroBatchWrite[epoch: 0, writer: org.apache.spark.sql.execution.datasources.noop.NoopStreamingWrite$20ba1e29]","count":"1","query_id":"094ebe4a-30a3-4541-90af-ca238e4e6697","run_id":"67b161c5-83e5-430a-a905-04815a0002f4"},"logger":"WriteToDataSourceV2Exec"}
{"ts":"2024-07-15T19:56:02.129Z","level":"INFO","msg":"Starting job: start at NativeMethodAccessorImpl.java:0","context":{"batch_id":"0","call_site_short_form":"start at NativeMethodAccessorImpl.java:0","query_id":"094ebe4a-30a3-4541-90af-ca238e4e6697","run_id":"67b161c5-83e5-430a-a905-04815a0002f4"},"logger":"SparkContext"}
{"ts":"2024-07-15T19:56:02.135Z","level":"INFO","msg":"Got job 0 (start at NativeMethodAccessorImpl.java:0) with 1 output partitions","context":{"call_site_short_form":"start at NativeMethodAccessorImpl.java:0","job_id":"0","num_partitions":"1"},"logger":"DAGScheduler"}
```

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#47340 from WweiL/structured-logging-streaming-id-aware.

Authored-by: Wei Liu <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
  • Loading branch information
WweiL authored and gengliangwang committed Jul 18, 2024
1 parent 83a9791 commit 723c1c5
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,8 @@ class MicroBatchExecution(
sparkSessionToRunBatch.sparkContext.setLocalProperty(
StreamExecution.IS_CONTINUOUS_PROCESSING, false.toString)

loggingThreadContext.put(LogKeys.BATCH_ID.name, execCtx.batchId.toString)

execCtx.reportTimeTaken("queryPlanning") {
execCtx.executionPlan = new IncrementalExecution(
sparkSessionToRunBatch,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ import scala.util.control.NonFatal

import com.google.common.util.concurrent.UncheckedExecutionException
import org.apache.hadoop.fs.Path
import org.apache.logging.log4j.CloseableThreadContext

import org.apache.spark.{JobArtifactSet, SparkContext, SparkException, SparkThrowable}
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKeys.{CHECKPOINT_PATH, CHECKPOINT_ROOT, LOGICAL_PLAN, PATH, PRETTY_ID_STRING, SPARK_DATA_STREAM}
import org.apache.spark.internal.LogKeys.{CHECKPOINT_PATH, CHECKPOINT_ROOT, LOGICAL_PLAN, PATH, PRETTY_ID_STRING, QUERY_ID, RUN_ID, SPARK_DATA_STREAM}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
Expand Down Expand Up @@ -90,6 +91,8 @@ abstract class StreamExecution(
protected val awaitProgressLock = new ReentrantLock(true)
protected val awaitProgressLockCondition = awaitProgressLock.newCondition()

protected var loggingThreadContext: CloseableThreadContext.Instance = _

private val initializationLatch = new CountDownLatch(1)
private val startLatch = new CountDownLatch(1)
private val terminationLatch = new CountDownLatch(1)
Expand Down Expand Up @@ -287,6 +290,11 @@ abstract class StreamExecution(
sparkSession.sparkContext.setJobGroup(runId.toString, getBatchDescriptionString,
interruptOnCancel = true)
sparkSession.sparkContext.setLocalProperty(StreamExecution.QUERY_ID_KEY, id.toString)
loggingThreadContext = CloseableThreadContext.putAll(
Map(
QUERY_ID.name -> id.toString,
RUN_ID.name -> runId.toString
).asJava)
if (sparkSession.sessionState.conf.streamingMetricsEnabled) {
sparkSession.sparkContext.env.metricsSystem.registerSource(streamMetrics)
}
Expand Down Expand Up @@ -405,6 +413,10 @@ abstract class StreamExecution(
new QueryTerminatedEvent(id, runId, exception.map(_.cause).map(Utils.exceptionString),
errorClassOpt))

if (loggingThreadContext != null) {
loggingThreadContext.close()
}

// Delete the temp checkpoint when either force delete enabled or the query didn't fail
if (deleteCheckpointOnStop &&
(sparkSession.sessionState.conf
Expand Down

0 comments on commit 723c1c5

Please sign in to comment.