-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Handling the job cancel in case of timeouts #60588
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
henry3260
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add unit tests for this? That would be super helpful. Thanks!
| region_name: str | None = None, | ||
| verify: bool | str | None = None, | ||
| botocore_config: dict | None = None, | ||
| cancel_waiter_names: list[str] | None = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add the corresponding docstring for this parameter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am waiting for the high level approval for the approach from @vincbeck . Once I get go ahead with the approach then I'll add UT's and the doc string also.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry forgot to change the PR to draft
| ) | ||
| yield TriggerEvent({"status": "success", self.return_key: self.return_value}) | ||
|
|
||
| async def cancel(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is it being called?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vincbeck ,
This function is the function defined in the baseTrigger class.
async def cleanup(self) -> None:
"""
Cleanup the trigger.
Called when the trigger is no longer needed, and it's being removed
from the active triggerer process.
This method follows the async/await pattern to allow to run the cleanup
in triggerer main event loop. Exceptions raised by the cleanup method
are ignored, so if you would like to be able to debug them and be notified
that cleanup method failed, you should wrap your code with try/except block
and handle it appropriately (in async-compatible way).
"""
And this cleanup function is called in the finally block of the runner function of trigerrer
async def run_trigger(self, trigger_id: int, trigger: BaseTrigger, timeout_after: datetime | None = None):
"""Run a trigger (they are async generators) and push their events into our outbound event deque."""
if not os.environ.get("AIRFLOW_DISABLE_GREENBACK_PORTAL", "").lower() == "true":
import greenback
await greenback.ensure_portal()
bind_log_contextvars(trigger_id=trigger_id)
name = self.triggers[trigger_id]["name"]
self.log.info("trigger %s starting", name)
try:
async for event in trigger.run():
await self.log.ainfo(
"Trigger fired event", name=self.triggers[trigger_id]["name"], result=event
)
self.triggers[trigger_id]["events"] += 1
self.events.append((trigger_id, event))
except asyncio.CancelledError:
# We get cancelled by the scheduler changing the task state. But if we do lets give a nice error
# message about it
if timeout := timeout_after:
timeout = timeout.replace(tzinfo=timezone.utc) if not timeout.tzinfo else timeout
if timeout < timezone.utcnow():
await self.log.aerror("Trigger cancelled due to timeout")
raise
finally:
# CancelledError will get injected when we're stopped - which is
# fine, the cleanup process will understand that, but we want to
# allow triggers a chance to cleanup, either in that case or if
# they exit cleanly. Exception from cleanup methods are ignored.
with suppress(Exception):
await trigger.cleanup()
await self.log.ainfo("trigger completed", name=name)
Resolves #60517
The timeouts for the deferred operators are not handled properly. This fixes the by implementing the cleanup method.