Skip to content

[SPARK-56326] Include streaming query and batch ids in scheduling logs#55166

Open
BrooksWalls wants to merge 4 commits intoapache:masterfrom
BrooksWalls:SPARK-56326/streamingQueryIdAndBatchIdInSchedulingLogs
Open

[SPARK-56326] Include streaming query and batch ids in scheduling logs#55166
BrooksWalls wants to merge 4 commits intoapache:masterfrom
BrooksWalls:SPARK-56326/streamingQueryIdAndBatchIdInSchedulingLogs

Conversation

@BrooksWalls
Copy link
Copy Markdown

What changes were proposed in this pull request?

This change adds the streaming query Id and batch Id to some of the scheduling logs in order to aid in debugging structured streaming queries.

There are three log lines which have been updated to include the query and batch Id:

26/04/02 16:34:01 INFO TaskSetManager: [queryId = 1251e] [batchId = 5] Starting task 0.0 in stage 5.0 (TID 129) (...,executor driver, partition 0, PROCESS_LOCAL, 9728 bytes)

26/04/02 16:34:01 INFO TaskSetManager: [queryId = 1251e] [batchId = 5] Finished task 6.0 in stage 5.0 (TID 135) in 12 ms on ...(executor driver) (6/32)

26/04/02 16:39:09 INFO FairSchedulableBuilder: [queryId = f5660] [batchId = 5] Added task set TaskSet_5.0 to pool default

Why are the changes needed?

When debugging multiple streaming queries running at the same time it can be difficult to go through the scheduling logs. By including the query and batch Id it is much easier to isolate logs to specific queries and batches.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Unit tests were added.

Also manually tested by running the spark shell and redirecting info logs to a temporary file. Then ran a basic streaming query and grepped the temp file for the desired log lines to ensure they included the query and batch id. Also confirmed a batch query ran in the shell does not include the query and batch Id in its logs.

Was this patch authored or co-authored using generative AI tooling?

yes, coauthored

Generated-by: claude

Copy link
Copy Markdown
Contributor

@dichlorodiphen dichlorodiphen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally looks good

@BrooksWalls BrooksWalls force-pushed the SPARK-56326/streamingQueryIdAndBatchIdInSchedulingLogs branch from 10d7760 to 92d2f95 Compare April 2, 2026 20:09
* Mix this trait into any scheduler component that has access to task
* properties and needs streaming-aware log output.
*/
private[scheduler] trait StructuredStreamingIdAwareSchedulerLogging extends Logging {
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use a trait here so all logs published from TaskSetManager will include the query and batch Id when present

*/
private[scheduler] trait StructuredStreamingIdAwareSchedulerLogging extends Logging {
// we gather the query and batch Id from the properties of a given TaskSet
protected def properties: Properties
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we can't rely on thread local properties, we need to gather the query and batch Id from the taskSet's properties, this must be set by class which mixes in the trait

* Helpers for constructing log entries enriched with structured streaming
* identifiers extracted from task properties.
*/
private[scheduler] object StructuredStreamingIdAwareSchedulerLogging extends Logging {
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uses a companion object here so that we can call the methods from SchedulableBuilder which can not set one Properties object at construction as it's shared across tasks

Comment on lines +116 to +122
// formatMessage truncates the queryId for readability
// so we use a blank messageWithContext to overwrite the full query Id to the context
formatMessage(
queryId,
batchId,
entry
) + MessageWithContext("", constructStreamingContext(queryId, batchId))
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a little clunky but I wanted to truncate the query Id in the outputted log line so that its more readable as you scan through, but still have the full query id in the log context. To do that we use a blank log line with the query context hashmap so it overrides the truncated query Id.

Comment on lines +153 to +155
// MDC places the log key in the context as all lowercase, so we do the same here
queryId.foreach(streamingContext.put(LogKeys.QUERY_ID.name.toLowerCase(Locale.ROOT), _))
batchId.foreach(streamingContext.put(LogKeys.BATCH_ID.name.toLowerCase(Locale.ROOT), _))
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if the lowercase is necessary here or not, but wanted to match the behavior of the log interpolator

healthTracker: Option[HealthTracker] = None,
clock: Clock = new SystemClock()) extends Schedulable with Logging {
clock: Clock = new SystemClock())
extends Schedulable with StructuredStreamingIdAwareSchedulerLogging {
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we extend the StructuredStreamingIdAwareSchedulerLogging instead of logging, all logs published will include the query and batch Id when handling a streaming query TaskSet

log"${MDC(LogKeys.POOL_NAME, poolName)}")

logInfo(
StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like I said above, since this method is called for many different TaskSets we have to use the companion object's method

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants