Skip to content

Conversation

@arorasachin9
Copy link
Contributor

Resolves #60517
The timeouts for the deferred operators are not handled properly. This fixes the by implementing the cleanup method.

Copy link
Contributor

@henry3260 henry3260 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add unit tests for this? That would be super helpful. Thanks!

region_name: str | None = None,
verify: bool | str | None = None,
botocore_config: dict | None = None,
cancel_waiter_names: list[str] | None = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add the corresponding docstring for this parameter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am waiting for the high level approval for the approach from @vincbeck . Once I get go ahead with the approach then I'll add UT's and the doc string also.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry forgot to change the PR to draft

@arorasachin9 arorasachin9 marked this pull request as draft January 15, 2026 15:33
)
yield TriggerEvent({"status": "success", self.return_key: self.return_value})

async def cancel(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is it being called?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vincbeck ,
This function is the function defined in the baseTrigger class.

    async def cleanup(self) -> None:
        """
        Cleanup the trigger.

        Called when the trigger is no longer needed, and it's being removed
        from the active triggerer process.

        This method follows the async/await pattern to allow to run the cleanup
        in triggerer main event loop. Exceptions raised by the cleanup method
        are ignored, so if you would like to be able to debug them and be notified
        that cleanup method failed, you should wrap your code with try/except block
        and handle it appropriately (in async-compatible way).
        """

And this cleanup function is called in the finally block of the runner function of trigerrer

async def run_trigger(self, trigger_id: int, trigger: BaseTrigger, timeout_after: datetime | None = None):
        """Run a trigger (they are async generators) and push their events into our outbound event deque."""
        if not os.environ.get("AIRFLOW_DISABLE_GREENBACK_PORTAL", "").lower() == "true":
            import greenback

            await greenback.ensure_portal()

        bind_log_contextvars(trigger_id=trigger_id)

        name = self.triggers[trigger_id]["name"]
        self.log.info("trigger %s starting", name)
        try:
            async for event in trigger.run():
                await self.log.ainfo(
                    "Trigger fired event", name=self.triggers[trigger_id]["name"], result=event
                )
                self.triggers[trigger_id]["events"] += 1
                self.events.append((trigger_id, event))
        except asyncio.CancelledError:
            # We get cancelled by the scheduler changing the task state. But if we do lets give a nice error
            # message about it
            if timeout := timeout_after:
                timeout = timeout.replace(tzinfo=timezone.utc) if not timeout.tzinfo else timeout
                if timeout < timezone.utcnow():
                    await self.log.aerror("Trigger cancelled due to timeout")
            raise
        finally:
            # CancelledError will get injected when we're stopped - which is
            # fine, the cleanup process will understand that, but we want to
            # allow triggers a chance to cleanup, either in that case or if
            # they exit cleanly. Exception from cleanup methods are ignored.
            with suppress(Exception):
                await trigger.cleanup()

            await self.log.ainfo("trigger completed", name=name)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers provider:amazon AWS/Amazon - related issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Triggerer timeout exception is not handled properly

4 participants