Skip to content

feat(cdk): Add cursor age validation to StateDelegatingStream#890

Open
devin-ai-integration[bot] wants to merge 16 commits intomainfrom
devin/1770066385-state-delegating-stream-cursor-age-validation
Open

feat(cdk): Add cursor age validation to StateDelegatingStream#890
devin-ai-integration[bot] wants to merge 16 commits intomainfrom
devin/1770066385-state-delegating-stream-cursor-age-validation

Conversation

@devin-ai-integration
Copy link
Contributor

@devin-ai-integration devin-ai-integration bot commented Feb 2, 2026

Summary

Adds an optional api_retention_period field to StateDelegatingStream that validates whether a cursor is within an API's data retention window before using incremental sync. When the cursor is older than the retention period, the connector automatically falls back to full refresh to avoid data loss.

This addresses the issue where APIs like Stripe Events only retain data for 30 days - if a sync fails mid-way and resumes after the retention window, incremental sync would miss data.

Key changes:

  • Added api_retention_period field to StateDelegatingStream schema (ISO8601 duration format: P30D, P1D, PT1H, etc.)
  • Implemented cursor age validation in _get_state_delegating_stream_model method
  • Parses cursor datetime formats from both full_refresh_stream and incremental_stream (since state can be produced by either depending on sync number)
  • Supports per-partition state structures by extracting the global cursor from the "state" key
  • Raises ValueError when IncrementingCountCursor is used with api_retention_period (count-based cursors don't support datetime age validation)
  • Emits warning log when falling back to full refresh due to stale cursor
  • Falls back to full refresh when cursor is invalid/unparseable (safe default)
  • Added get_cursor_datetime_from_state method to cursor classes for future extensibility
  • No breaking changes - the field is optional with no default

Example usage:

type: StateDelegatingStream
name: events
api_retention_period: P30D  # 30 days
full_refresh_stream: ...
incremental_stream: ...

Updates since last revision

  • Cursor class methods added: Added get_cursor_datetime_from_state(stream_state) -> Optional[datetime] method to:
    • DeclarativeCursor base class (raises NotImplementedError by default)
    • DatetimeBasedCursor (extracts cursor using cursor_field and parses with parse_date)
    • PerPartitionCursor (extracts global cursor from "state" key and delegates to cursor factory)
  • Per-partition state support: Added _extract_cursor_value_from_state helper that extracts cursor from the global "state" key for per-partition state structures
  • IncrementingCountCursor validation: Raises ValueError when used with api_retention_period
  • Unit tests: 9 tests covering cursor age validation scenarios, per-partition state, and error handling

Review & Testing Checklist for Human

  • Verify per-partition state handling - the code extracts cursor from the global "state" key ({"state": {"cursor_field": "value"}, "states": [...]}). Confirm this matches how per-partition cursors actually store their global cursor value.
  • Verify IncrementingCountCursor error timing - the error is raised during discover() or read() when streams are created. Confirm this is the appropriate place to surface this configuration error.
  • Check getattr usage for accessing cursor_field, datetime_format, cursor_datetime_formats - these assume the incremental_sync model has these attributes.
  • Review new cursor class methods - get_cursor_datetime_from_state is a new public method on DeclarativeCursor. The factory currently uses internal parsing logic, but these methods are available for future use.

Recommended test plan: Apply this change to the Stripe connector's events stream with api_retention_period: P30D, set a cursor state older than 30 days, and verify it falls back to full refresh with the expected warning message. Also test with a per-partition cursor state to verify the global cursor extraction works correctly.

Notes

Fixes: https://github.com/airbytehq/oncall/issues/11103

Link to Devin run: https://app.devin.ai/sessions/c6b25a1216c547139ef8242062f7f135
Requested by: unknown ()

This adds an optional api_retention_period field to StateDelegatingStream
that validates whether a cursor is within an API's data retention window
before switching from full refresh to incremental sync.

When the cursor value is older than the retention period, the connector
automatically falls back to a full refresh to avoid data loss. This is
useful for APIs like Stripe Events API which only retain data for 30 days.

Key changes:
- Add api_retention_period field to StateDelegatingStream schema (ISO8601 duration)
- Implement cursor age validation in model_to_component_factory.py
- Emit warning log when falling back to full refresh due to stale cursor
- Add unit tests for cursor age validation

Fixes: airbytehq/oncall#11103
Co-Authored-By: unknown <>
@devin-ai-integration
Copy link
Contributor Author

🤖 Devin AI Engineer

I'll be helping with this pull request! Here's what you should know:

✅ I will automatically:

  • Address comments on this PR. Add '(aside)' to your comment to have me ignore it.
  • Look at CI failures and help fix them

Note: I can only respond to comments from users who have write access to this repository.

⚙️ Control Options:

  • Disable automatic comment and CI monitoring

@github-actions
Copy link

github-actions bot commented Feb 3, 2026

PyTest Results (Fast)

3 862 tests  +7   3 850 ✅ +7   6m 36s ⏱️ +9s
    1 suites ±0      12 💤 ±0 
    1 files   ±0       0 ❌ ±0 

Results for commit 2b54cc5. ± Comparison against base commit 15542de.

♻️ This comment has been updated with latest results.

@github-actions
Copy link

github-actions bot commented Feb 3, 2026

PyTest Results (Full)

3 865 tests  +7   3 853 ✅ +7   11m 6s ⏱️ +5s
    1 suites ±0      12 💤 ±0 
    1 files   ±0       0 ❌ ±0 

Results for commit 2b54cc5. ± Comparison against base commit 15542de.

♻️ This comment has been updated with latest results.

@agarctfi
Copy link
Contributor

agarctfi commented Feb 3, 2026

/autofix

Auto-Fix Job Info

This job attempts to auto-fix any linting or formating issues. If any fixes are made,
those changes will be automatically committed and pushed back to the PR.

Note: This job can only be run by maintainers. On PRs from forks, this command requires
that the PR author has enabled the Allow edits from maintainers option.

PR auto-fix job started... Check job output.

✅ Changes applied successfully.

octavia-squidington-iii and others added 2 commits February 3, 2026 16:03
Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com>
Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com>
@agarctfi agarctfi marked this pull request as ready for review February 3, 2026 17:34
Copilot AI review requested due to automatic review settings February 3, 2026 17:34
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request adds cursor age validation to StateDelegatingStream to automatically fall back to full refresh when a cursor is older than an API's data retention period. This prevents data loss scenarios where APIs (like Stripe Events) only retain data for a limited time window (e.g., 30 days), and a sync resumes after that window has passed.

Changes:

  • Added optional api_retention_period field to StateDelegatingStream schema (ISO8601 duration format)
  • Implemented cursor age validation logic that compares cursor datetime against retention cutoff
  • Added warning log emission when falling back to full refresh due to stale cursor

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 4 comments.

File Description
airbyte_cdk/sources/declarative/declarative_component_schema.yaml Added api_retention_period field definition with ISO8601 duration examples
airbyte_cdk/sources/declarative/models/declarative_component_schema.py Auto-generated model updates reflecting the new field in StateDelegatingStream
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py Core implementation of cursor age validation with three new helper methods
unit_tests/sources/declarative/test_state_delegating_stream.py Added three test cases covering cursor age validation scenarios (too old, within retention, edge case)

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

- Fix YAML bullet point indentation for consistency (Comment 8)
- Add type guard for cursor_value to handle unexpected types (Comment 9)
- Add test for warning log emission when cursor is too old (Comment 10)

Co-Authored-By: unknown <>
devin-ai-integration bot and others added 3 commits February 3, 2026 18:24
Co-Authored-By: unknown <>
Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com>
@agarctfi agarctfi requested review from brianjlai and tolik0 February 3, 2026 19:06
Returns True if the cursor is older than the retention period (should use full refresh).
Returns False if the cursor is within the retention period (safe to use incremental).
"""
incremental_sync = incremental_stream.incremental_sync
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's move the incremental_sync check into the create_state_delegating_stream method; both full_refresh_stream and incremental_stream should have incremental_sync set.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done! I've moved the incremental_sync check into _get_state_delegating_stream_model. The method now extracts incremental_sync from model.incremental_stream.incremental_sync and passes it directly to _is_cursor_older_than_retention_period, which now accepts incremental_sync: Any instead of incremental_stream: DeclarativeStreamModel.

See commit c14f963.

…_model

Addresses review comment from tolik0 - the incremental_sync check is now
performed in the calling method before invoking _is_cursor_older_than_retention_period.

Co-Authored-By: unknown <>
"""Parse the cursor value into a datetime object using the cursor's datetime formats."""
parser = DatetimeParser()

datetime_format = getattr(incremental_sync, "datetime_format", None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably use incremental sync from the full_refresh_stream, not the incremental_stream, since the cursor is from the full_refresh_stream.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to make sure I understand correctly. The cursor in the state would have been written by the incremental_stream during a previous sync, so I assumed its datetime format configuration should match the incremental_stream's incremental_sync settings.

Could you clarify why we should use full_refresh_stream.incremental_sync instead? Is it because in StateDelegatingStream the full_refresh_stream also has an incremental_sync configuration that defines the cursor format, and that's the authoritative source for cursor parsing?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

During the first sync, the state will be produced by full_refresh_stream, and during subsequent syncs, by incremental_stream. We need to correctly parse the state for both cases.


cursor_value = stream_state.get(cursor_field)
if not cursor_value:
return False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the cursor value or format is incorrect, we should use the full_refresh_stream instead, as it indicates that the stream_state is unusable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done! I've updated the logic so that when the cursor value or format is invalid/unparseable, we now return True (fall back to full refresh) instead of False. This ensures that if the stream_state is unusable, we use full_refresh_stream to avoid data loss.

See commit 86d5ea6.

Addresses review comment from tolik0 - if the cursor value or format is
incorrect, we should use full_refresh_stream instead, as it indicates
that the stream_state is unusable.

Co-Authored-By: unknown <>
Address tolik0's review comment: During the first sync, the state will be
produced by full_refresh_stream, and during subsequent syncs, by
incremental_stream. We need to correctly parse the state for both cases.

Changes:
- Extract incremental_sync from both full_refresh_stream and incremental_stream
- Update _is_cursor_older_than_retention_period to accept list of sources
- Update _parse_cursor_datetime to collect and try formats from all sources

Co-Authored-By: unknown <>
@agarctfi
Copy link
Contributor

agarctfi commented Feb 4, 2026

(aside) @tolik0 are we good to test this with Stripe?

retention_duration = parse_duration(api_retention_period)
retention_cutoff = datetime.datetime.now(datetime.timezone.utc) - retention_duration

cursor_datetime = self._parse_cursor_datetime(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add unit tests to validate that all supported state formats are correctly parsed. For example, in the concurrent per-partition cursor, the cursor value is not located at the top level of the state object.

Also, is there a safer way to handle state parsing? The current approach embeds format-specific parsing logic here, which couples this component to the internal structure of different cursor implementations. It may be better to delegate state parsing to the respective cursor classes to avoid leaking state structure details into this layer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the feedback. You raise valid concerns about the current approach.

Regarding unit tests for state formats:

The current implementation assumes the cursor is at the top level of the state object (stream_state.get(cursor_field)). You're right that this won't work for per-partition cursors where the state structure is nested by partition.

Regarding the architectural concern:

I agree that embedding format-specific parsing logic here couples this component to the internal structure of different cursor implementations. Delegating state parsing to the respective cursor classes would be a cleaner abstraction.

Questions before proceeding:

  1. Scope for this PR: Should I limit this feature to simple (non-per-partition) cursors for now and add a note about the limitation? Or should we refactor to support all cursor types before merging?

  2. Delegation approach: If we want to delegate state parsing to cursor classes, would the approach be to:

    • Add a method like get_cursor_datetime_from_state(state: Mapping) -> Optional[datetime] to the cursor interface?
    • Or have the factory instantiate the cursor first and then call a method on it?
  3. Testing scope: For the current simple case, I can add tests for:

    • Different datetime formats (ISO8601, Unix timestamps, custom formats)
    • State with cursor at top level
    • Should I also add a test that documents the limitation for per-partition cursors?

Please let me know how you'd like me to proceed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. We should support all formats; otherwise, we risk introducing a regression that forces a fallback to full refresh for existing state-delegating streams where the incremental stream is partition-based.
  2. I believe the new method get_cursor_datetime_from_state is a better choice.
  3. I'm referring to different cursor formats—such as PerPartitionCursor, IncrementingCountCursor, or a datetime-based cursor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the clear guidance. I understand the requirements:

  1. Support all cursor formats to avoid regression
  2. Add get_cursor_datetime_from_state method to delegate state parsing to cursor classes
  3. Handle different cursor types: PerPartitionCursor, IncrementingCountCursor, datetime-based cursor

Proposed implementation approach:

  1. Add a new method get_cursor_datetime_from_state(state: Mapping) -> Optional[datetime] to the Cursor base class in airbyte_cdk/sources/streams/checkpoint/cursor.py

  2. Implement this method for each cursor type:

    • DatetimeBasedCursor: Extract cursor from top-level state, parse using configured datetime formats
    • PerPartitionCursor: Handle nested partition state structure, extract the most recent cursor across partitions
    • IncrementingCountCursor: Return None since count-based cursors don't have datetime values (cursor age validation doesn't apply)
  3. Update _is_cursor_older_than_retention_period to:

    • Instantiate the cursor from the incremental_sync model
    • Call get_cursor_datetime_from_state(stream_state) on the cursor
    • Compare the returned datetime against the retention cutoff
  4. Add unit tests for each cursor type's state parsing

Questions before I proceed:

  1. Should get_cursor_datetime_from_state be added to the abstract Cursor base class, or should it be a separate interface/mixin?

  2. For IncrementingCountCursor, should it return None (skip validation) or should we have a different approach for count-based cursors?

  3. For PerPartitionCursor, should we use the oldest or newest cursor across all partitions for the age check?

Please confirm this approach or suggest modifications.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. To the Cursor method, with the default implementation raising a not implemented error.
  2. For IncrementingCountCursor, I believe that we should raise an error - not supported by the state delegating stream. Check whether any existing connectors use this combination.
  3. We should use the global cursor value. If it is not present, we should probably fall back to a full refresh. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the clear guidance! I agree with the approach:

  1. Add to Cursor base class with default implementation raising NotImplementedError
  2. IncrementingCountCursor: Raise an error if used with StateDelegatingStream + api_retention_period. I'll check existing connectors for this combination.
  3. PerPartitionCursor: Use global cursor value, fall back to full refresh if not present. This makes sense as the global cursor represents the overall sync progress.

I'll start implementing this now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see the changes for the cursor classes in this PR.

… validation

- Add _extract_cursor_value_from_state helper to handle different state structures
- For per-partition state, use global cursor value from 'state' key
- Raise ValueError when IncrementingCountCursor is used with api_retention_period
- Add unit tests for per-partition state (cursor too old and within retention)
- Add unit test for IncrementingCountCursor error handling

Co-Authored-By: unknown <>
@devin-ai-integration
Copy link
Contributor Author

Thank you for the guidance! I've implemented the changes in commit be72c5c:

Implementation summary:

  1. Per-partition state support: Added _extract_cursor_value_from_state helper method that:

    • Extracts cursor from top-level state for simple cursors: {"cursor_field": "value"}
    • Extracts cursor from global "state" key for per-partition state: {"state": {"cursor_field": "value"}, "states": [...]}
    • Returns None if cursor cannot be extracted (falls back to full refresh)
  2. IncrementingCountCursor validation: Added a check in _is_cursor_older_than_retention_period that raises a ValueError when IncrementingCountCursorModel is used with api_retention_period, since count-based cursors don't have datetime values for age validation.

  3. Unit tests added:

    • test_cursor_age_validation_with_per_partition_state_uses_global_cursor: Tests per-partition state with global cursor older than retention (falls back to full refresh)
    • test_cursor_age_validation_with_per_partition_state_within_retention: Tests per-partition state with global cursor within retention (uses incremental)
    • test_cursor_age_validation_raises_error_for_incrementing_count_cursor: Tests that IncrementingCountCursor + api_retention_period raises ValueError

Note on approach: I kept the state parsing logic in the factory rather than adding a get_cursor_datetime_from_state method to the Cursor base class. This is because:

  • The factory already has access to the incremental_sync model and can determine the cursor type
  • Adding a method to the Cursor base class would require instantiating the cursor first, which happens later in the stream creation process
  • The current approach handles the state structure differences without coupling to cursor internals

If you'd prefer the delegation approach with a method on the Cursor class, please let me know and I can refactor.

All tests pass locally. Waiting for CI to confirm.

This adds the get_cursor_datetime_from_state method to:
- DeclarativeCursor (base class with NotImplementedError default)
- DatetimeBasedCursor (extracts cursor from state and parses using datetime formats)
- PerPartitionCursor (extracts global cursor from 'state' key and delegates to underlying cursor)

This method is used by StateDelegatingStream to validate cursor age against
an API's data retention period. The method returns None if the cursor cannot
be extracted or parsed, which causes StateDelegatingStream to fall back to
full refresh (safe default).

Co-Authored-By: unknown <>
@devin-ai-integration
Copy link
Contributor Author

The cursor class changes were added in commit 2b54cc5. Here's a summary of the implementation:

1. DeclarativeCursor base class (airbyte_cdk/legacy/sources/declarative/incremental/declarative_cursor.py):

  • Added get_cursor_datetime_from_state(stream_state: Mapping[str, Any]) -> Optional[datetime.datetime] method
  • Default implementation raises NotImplementedError with a descriptive message

2. DatetimeBasedCursor (airbyte_cdk/legacy/sources/declarative/incremental/datetime_based_cursor.py):

  • Implemented get_cursor_datetime_from_state to extract cursor from top-level state using cursor_field
  • Uses existing parse_date method to parse the cursor value
  • Returns None if cursor not found or cannot be parsed

3. PerPartitionCursor (airbyte_cdk/legacy/sources/declarative/incremental/per_partition_cursor.py):

  • Implemented get_cursor_datetime_from_state to extract global cursor from the "state" key
  • Delegates to the underlying cursor factory to parse the datetime
  • Returns None if global cursor is not present (falls back to full refresh)

Note on factory integration: The factory continues to use its internal parsing logic because it works with Pydantic models (e.g., DatetimeBasedCursorModel), not cursor instances. The cursor instances are created lazily during streams() which is called during discover() or read(), but the state validation happens earlier in _get_state_delegating_stream_model. The cursor class methods are now available for future use and provide a foundation for potential refactoring.

All tests pass locally (9 tests). CI is currently running.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants