[FLINK-38940] Start checkpointing when all tasks are running and job is running#27441
[FLINK-38940] Start checkpointing when all tasks are running and job is running#27441rkhachatryan merged 1 commit intoapache:masterfrom
Conversation
| <td><h5>execution.checkpointing.pause-checkpoints-if-tasks-not-running</h5></td> | ||
| <td style="word-wrap: break-word;">false</td> | ||
| <td>Boolean</td> | ||
| <td>When enabled, checkpoints will be paused as long as some tasks are not running (and not finished), even if the job status is running. This help to prevent an additional delay on job startup</td> |
There was a problem hiding this comment.
I am struggling with these sentences.
- nit it would be simpler to say if any tasks rather than as long as some tasks
- What does it mean to say a task is not running, but has a status of running? I think we need to talk about non blocking output, maybe more words outside of the config definition to say how to recognise non blocking jobs - as this relates to the subset of jobs we are effecting with this option.
- nit: This help -> This helps
- the text says "prevent an additional delay on job startup". But the logic does not seem startup specific? Can we get into any other non startup related scenarios where this might be helpful
- Can we guide the user as to when they should consider using this option and any downside to using it.
There was a problem hiding this comment.
We discussed this PR with @1996fanrui offline and agreed to remove the option because the new behavior is the only desirable (until FLIP-457). After FLIP-457, this logic will be changed.
| this.coordinator = checkNotNull(coordinator); | ||
| this.allTasksOutputNonBlocking = allTasksOutputNonBlocking; | ||
| return (jobId, newJobStatus, timestamp) -> { | ||
| if (newJobStatus == JobStatus.RUNNING && allTasksOutputNonBlocking) { |
There was a problem hiding this comment.
I an curious - should we be checking the new config option here? I thought this is the condition we wanted to pause checkpointing for? Am I missing something?
There was a problem hiding this comment.
The check is in CheckpointCoordinator.createActivatorDeactivator, but as mentioned above, I'm going to remove this option.
| new CheckpointCoordinatorDeActivator(this, allTasksOutputNonBlocking); | ||
| if (deActivator == null) { | ||
| if (pauseCheckpointsIfTasksNotRunning && allTasksOutputNonBlocking) { | ||
| deActivator = new ExecutionTrackingCheckpointCoordinatorDeActivator(this); |
There was a problem hiding this comment.
Maybe a comment to indicate why this if body results in a pause.
2012346 to
bb34b02
Compare
| static CheckpointCoordinatorDeActivator forJobStatus( | ||
| CheckpointCoordinator coordinator, boolean allTasksOutputNonBlocking) { | ||
| this.coordinator = checkNotNull(coordinator); | ||
| this.allTasksOutputNonBlocking = allTasksOutputNonBlocking; | ||
| return (jobId, newJobStatus, timestamp) -> { | ||
| if (newJobStatus == JobStatus.RUNNING && allTasksOutputNonBlocking) { | ||
| // start the checkpoint scheduler if there is no blocking edge | ||
| coordinator.startCheckpointScheduler(); | ||
| } else { | ||
| // anything else should stop the trigger for now | ||
| coordinator.stopCheckpointScheduler(); | ||
| } |
There was a problem hiding this comment.
forJobStatus is only called once, and allTasksOutputNonBlocking is false. so forJobStatus can be simplified.
There was a problem hiding this comment.
Right, I've simplified the code, PTAL.
No description provided.