[SPARK-56326] Include streaming query and batch ids in scheduling logs#55166
[SPARK-56326] Include streaming query and batch ids in scheduling logs#55166BrooksWalls wants to merge 4 commits intoapache:masterfrom
Conversation
dichlorodiphen
left a comment
There was a problem hiding this comment.
Generally looks good
10d7760 to
92d2f95
Compare
| * Mix this trait into any scheduler component that has access to task | ||
| * properties and needs streaming-aware log output. | ||
| */ | ||
| private[scheduler] trait StructuredStreamingIdAwareSchedulerLogging extends Logging { |
There was a problem hiding this comment.
Use a trait here so all logs published from TaskSetManager will include the query and batch Id when present
| */ | ||
| private[scheduler] trait StructuredStreamingIdAwareSchedulerLogging extends Logging { | ||
| // we gather the query and batch Id from the properties of a given TaskSet | ||
| protected def properties: Properties |
There was a problem hiding this comment.
Since we can't rely on thread local properties, we need to gather the query and batch Id from the taskSet's properties, this must be set by class which mixes in the trait
| * Helpers for constructing log entries enriched with structured streaming | ||
| * identifiers extracted from task properties. | ||
| */ | ||
| private[scheduler] object StructuredStreamingIdAwareSchedulerLogging extends Logging { |
There was a problem hiding this comment.
uses a companion object here so that we can call the methods from SchedulableBuilder which can not set one Properties object at construction as it's shared across tasks
| // formatMessage truncates the queryId for readability | ||
| // so we use a blank messageWithContext to overwrite the full query Id to the context | ||
| formatMessage( | ||
| queryId, | ||
| batchId, | ||
| entry | ||
| ) + MessageWithContext("", constructStreamingContext(queryId, batchId)) |
There was a problem hiding this comment.
This is a little clunky but I wanted to truncate the query Id in the outputted log line so that its more readable as you scan through, but still have the full query id in the log context. To do that we use a blank log line with the query context hashmap so it overrides the truncated query Id.
| // MDC places the log key in the context as all lowercase, so we do the same here | ||
| queryId.foreach(streamingContext.put(LogKeys.QUERY_ID.name.toLowerCase(Locale.ROOT), _)) | ||
| batchId.foreach(streamingContext.put(LogKeys.BATCH_ID.name.toLowerCase(Locale.ROOT), _)) |
There was a problem hiding this comment.
I'm not sure if the lowercase is necessary here or not, but wanted to match the behavior of the log interpolator
| healthTracker: Option[HealthTracker] = None, | ||
| clock: Clock = new SystemClock()) extends Schedulable with Logging { | ||
| clock: Clock = new SystemClock()) | ||
| extends Schedulable with StructuredStreamingIdAwareSchedulerLogging { |
There was a problem hiding this comment.
Since we extend the StructuredStreamingIdAwareSchedulerLogging instead of logging, all logs published will include the query and batch Id when handling a streaming query TaskSet
| log"${MDC(LogKeys.POOL_NAME, poolName)}") | ||
|
|
||
| logInfo( | ||
| StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry( |
There was a problem hiding this comment.
Like I said above, since this method is called for many different TaskSets we have to use the companion object's method
What changes were proposed in this pull request?
This change adds the streaming query Id and batch Id to some of the scheduling logs in order to aid in debugging structured streaming queries.
There are three log lines which have been updated to include the query and batch Id:
26/04/02 16:34:01 INFO TaskSetManager: [queryId = 1251e] [batchId = 5] Starting task 0.0 in stage 5.0 (TID 129) (...,executor driver, partition 0, PROCESS_LOCAL, 9728 bytes)26/04/02 16:34:01 INFO TaskSetManager: [queryId = 1251e] [batchId = 5] Finished task 6.0 in stage 5.0 (TID 135) in 12 ms on ...(executor driver) (6/32)26/04/02 16:39:09 INFO FairSchedulableBuilder: [queryId = f5660] [batchId = 5] Added task set TaskSet_5.0 to pool defaultWhy are the changes needed?
When debugging multiple streaming queries running at the same time it can be difficult to go through the scheduling logs. By including the query and batch Id it is much easier to isolate logs to specific queries and batches.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Unit tests were added.
Also manually tested by running the spark shell and redirecting info logs to a temporary file. Then ran a basic streaming query and grepped the temp file for the desired log lines to ensure they included the query and batch id. Also confirmed a batch query ran in the shell does not include the query and batch Id in its logs.
Was this patch authored or co-authored using generative AI tooling?
yes, coauthored
Generated-by: claude