Skip to content

fix(cdk): fix SubstreamPartitionRouter updating cursor value when no records were read in partition#889

Open
darynaishchenko wants to merge 7 commits intomainfrom
daryna/fix-substream-partition-router
Open

fix(cdk): fix SubstreamPartitionRouter updating cursor value when no records were read in partition#889
darynaishchenko wants to merge 7 commits intomainfrom
daryna/fix-substream-partition-router

Conversation

@darynaishchenko
Copy link
Contributor

@darynaishchenko darynaishchenko commented Feb 2, 2026

Summary

This PR fixes an issue where SubstreamPartitionRouter would incorrectly update cursor values when no records were read in a partition. The fix adds defensive null-checking throughout stream_slices() to gracefully handle:

  1. Empty partition generators - iterate_with_last_flag now yields (None, True) sentinel when the input generator is empty, and stream_slices() breaks out of the partition loop when partition is None (preserving multi-parent behavior)
  2. Empty record iterators - When a partition has no records, cursor observation and slice emission are skipped (guarded by if parent_record is not None)

Updates since last revision

  • Changed if partition is None: return to if partition is None: break per review feedback. Using break ensures that if one parent stream has no partitions, we continue processing subsequent parent_stream_configs rather than exiting the entire method.
  • Added test_substream_partition_router_closes_all_partitions_even_when_no_records to verify that cursor.close_partition() is called for ALL partitions, including those with no records. This addresses the review feedback requesting validation of partition lifecycle management.
  • Fixed MyPy type error: Updated iterate_with_last_flag return type from tuple[T, bool] to tuple[T | None, bool] to properly reflect that the function yields (None, True) as a sentinel for empty generators.

Review & Testing Checklist for Human

  • ⚠️ INVESTIGATE TEST FAILURE: test_substream_slicer_parent_state_update_with_cursor is failing on this branch but passes on main. The test expects lookback_window: 0 but gets lookback_window: 1, and there's an extra state key in the output. This appears to be a pre-existing issue in the PR's original changes that needs investigation.
  • Verify multi-parent behavior: The if partition is None: break exits only the current parent's partition loop. Test with a connector that has multiple parent_stream_configs where one parent has no partitions to confirm subsequent parents are still processed. (Note: current tests only cover single-parent scenarios)
  • Test with real connector: Test with a connector using SubstreamPartitionRouter where a parent stream returns empty partitions or partitions with no records
  • Run full test suite: poetry run pytest unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py -v

Notes

Summary by CodeRabbit

  • New Features

    • Added an iterator utility that flags the final element and can yield a None marker on exhaustion.
  • Bug Fixes

    • Prevented emitting slices for empty/None partitions and added early-exit when inputs are exhausted.
    • Guarded partition extraction and cursor/state updates so they run only when a valid parent record exists.
    • Ensured cursors/state are closed correctly for partitions with no records without advancing state.
  • Tests

    • Added unit tests covering the new utility and edge-case partition behaviors.

@darynaishchenko darynaishchenko self-assigned this Feb 2, 2026
@github-actions
Copy link

github-actions bot commented Feb 2, 2026

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

💡 Show Tips and Tricks

Testing This CDK Version

You can test this version of the CDK using the following:

# Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@daryna/fix-substream-partition-router#egg=airbyte-python-cdk[dev]' --help

# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch daryna/fix-substream-partition-router

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /autofix - Fixes most formatting and linting issues
  • /poetry-lock - Updates poetry.lock file
  • /test - Runs connector tests with the updated CDK
  • /prerelease - Triggers a prerelease publish with default arguments
  • /poe build - Regenerate git-committed build artifacts, such as the pydantic models which are generated from the manifest JSON schema in YAML.
  • /poe <command> - Runs any poe command in the CDK environment
📚 Show Repo Guidance

Helpful Resources

📝 Edit this welcome message.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 2, 2026

📝 Walkthrough

Walkthrough

Changed SubstreamPartitionRouter to add iterate_with_last_flag (now yields (T | None, bool)), handle exhausted generators by yielding (None, True), and guard all parent_record-dependent processing (cursor observation, partition tracking, partition_value/extra_fields/lazy pointer extraction, and slice emission) behind non-None checks; outer loop now breaks early on None partitions.

Changes

Cohort / File(s) Summary
Substream partition router
airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py
Added iterate_with_last_flag(generator: Iterable[T]) -> Iterable[tuple[T | None, bool]]; when input is exhausted it yields (None, True); guarded processing that depends on parent_record (cursor/state observation, partition association, partition_value extraction via dpath, extra_fields, lazy_read_pointer), preserved KeyError handling inside guarded path, emit StreamSlice only for non-None parent_record, and break outer loop early if a partition is None.
Unit tests for edge cases
unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py
Added tests importing iterate_with_last_flag; new tests cover (item, is_last) semantics for empty/single/multiple inputs and SubstreamPartitionRouter behaviors: no slice/no cursor update for partitions with no records, handling empty parent partitions (early exit), and ensuring close_partition is called for all partitions even when no records are emitted.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Would you like me to generate a small focused patch-level checklist for reviewers (key lines/behaviors to verify), wdyt?

🚥 Pre-merge checks | ✅ 2 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 71.43% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title directly and specifically describes the main fix: preventing incorrect cursor updates when SubstreamPartitionRouter encounters partitions with no records.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch daryna/fix-substream-partition-router

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py (1)

42-49: ⚠️ Potential issue | 🔴 Critical

Type annotation mismatch causing pipeline failure.

The pipeline is flagging a type error here. Yielding (None, True) produces tuple[None, bool], but the return type expects tuple[T, bool]. Since the sentinel introduces None as a possible value, would updating the return type to Optional[T] work for you, wdyt?

🔧 Proposed fix
-def iterate_with_last_flag(generator: Iterable[T]) -> Iterable[tuple[T, bool]]:
+def iterate_with_last_flag(generator: Iterable[T]) -> Iterable[tuple[Optional[T], bool]]:
🤖 Fix all issues with AI agents
In
`@airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py`:
- Around line 210-211: In stream_slices() in substream_partition_router.py the
check "if partition is None: return" prematurely exits the entire method when a
parent_stream.generate_partitions() yields the (None, True) sentinel; change
this to "break" (or otherwise exit the inner partition loop) so we stop
iterating that parent stream's partitions and continue to the next
parent_stream_config instead of returning from stream_slices(), leaving the rest
of the parent streams to be processed; locate the partition handling inside the
loop that consumes parent_stream.generate_partitions() and replace the return
with a break (or equivalent loop-control) so behavior matches the original
semantics.

@github-actions
Copy link

github-actions bot commented Feb 2, 2026

PyTest Results (Fast)

1 373 tests   - 2 482   1 361 ✅  - 2 482   5m 12s ⏱️ - 1m 15s
    1 suites ±    0      11 💤  -     1 
    1 files   ±    0       1 ❌ +    1 

For more details on these failures, see this check.

Results for commit bdccfd7. ± Comparison against base commit 15542de.

This pull request removes 2482 tests.
unit_tests.sources.declarative.partition_routers.test_substream_partition_router ‑ test_cartesian_product_stream_slicer_warning_log_message[stream_slicers0-False]
unit_tests.sources.declarative.partition_routers.test_substream_partition_router ‑ test_cartesian_product_stream_slicer_warning_log_message[stream_slicers1-True]
unit_tests.sources.declarative.partition_routers.test_substream_partition_router ‑ test_cartesian_product_stream_slicer_warning_log_message[stream_slicers2-True]
unit_tests.sources.declarative.partition_routers.test_substream_partition_router ‑ test_request_params_interpolation_for_parent_stream[config_interpolation]
unit_tests.sources.declarative.partition_routers.test_substream_partition_router ‑ test_request_params_interpolation_for_parent_stream[parameters_interpolation]
unit_tests.sources.declarative.partition_routers.test_substream_partition_router ‑ test_substream_checkpoints_after_each_parent_partition
unit_tests.sources.declarative.partition_routers.test_substream_partition_router ‑ test_substream_partition_router_with_extra_keys[test_with_nested_extra_keys]
unit_tests.sources.declarative.partition_routers.test_substream_partition_router ‑ test_substream_partition_router_with_extra_keys[test_with_single_extra_key]
unit_tests.sources.declarative.partition_routers.test_substream_partition_router ‑ test_substream_using_incremental_parent_stream
unit_tests.sources.declarative.requesters.error_handlers.backoff_strategies.test_constant_backoff ‑ test_constant_backoff[test_constant_backoff_attempt_round_float-1.0-6.7-6.7]
…

♻️ This comment has been updated with latest results.

@github-actions
Copy link

github-actions bot commented Feb 2, 2026

PyTest Results (Full)

3 866 tests  +8   3 853 ✅ +7   11m 5s ⏱️ +4s
    1 suites ±0      12 💤 ±0 
    1 files   ±0       1 ❌ +1 

For more details on these failures, see this check.

Results for commit bdccfd7. ± Comparison against base commit 15542de.

♻️ This comment has been updated with latest results.

@darynaishchenko
Copy link
Contributor Author

darynaishchenko commented Feb 2, 2026

/prerelease

Prerelease Job Info

This job triggers the publish workflow with default arguments to create a prerelease.

Prerelease job started... Check job output.

✅ Prerelease workflow triggered successfully.

View the publish workflow run: https://github.com/airbytehq/airbyte-python-cdk/actions/runs/21601466980

devin-ai-integration bot and others added 2 commits February 3, 2026 15:38
…titionRouter null handling

Co-Authored-By: Daryna Ishchenko <darina.ishchenko17@gmail.com>
…ulti-parent behavior

Co-Authored-By: Daryna Ishchenko <darina.ishchenko17@gmail.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py (2)

42-49: ⚠️ Potential issue | 🟠 Major

Fix mypy type mismatch in iterate_with_last_flag.
Line 48 yields (None, True), but the signature advertises tuple[T, bool], which is tripping mypy. Could we widen the return type to Optional[T] to match the sentinel, wdyt?

🔧 Proposed fix
-def iterate_with_last_flag(generator: Iterable[T]) -> Iterable[tuple[T, bool]]:
+def iterate_with_last_flag(generator: Iterable[T]) -> Iterable[tuple[Optional[T], bool]]:

186-187: ⚠️ Potential issue | 🟡 Minor

Update docstring to match empty-slice behavior.
Lines 186–187 still say an empty parent slice emits parent_record=None, but the code now skips emission. Could we adjust the docstring to reflect the new behavior, wdyt?

📝 Suggested doc tweak
-        If a parent slice contains no record, emit a slice with parent_record=None.
+        If a parent slice contains no record, emit no slice.

assert result == expected_output


def test_substream_partition_router_no_cursor_update_when_partition_has_no_records():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also add a test that validates that all parent stream partitions are properly closed, even if no parent records are produced for a given parent partition.

devin-ai-integration bot and others added 3 commits February 5, 2026 10:36
… when no records

Co-Authored-By: Daryna Ishchenko <darina.ishchenko17@gmail.com>
… empty generators

Co-Authored-By: Daryna Ishchenko <darina.ishchenko17@gmail.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In
`@airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py`:
- Around line 231-267: The try/except around dpath.get in
substream_partition_router should not use an early `continue` because that skips
the later partition closure logic; instead, introduce a boolean like `skip_slice
= True` when KeyError occurs (and only set `skip_slice = False` when extraction
succeeds), keep `extracted_extra_fields` in a safe default state, and let
execution fall through so the existing `if is_last_record_in_slice:
parent_stream.cursor.close_partition(partition)` / `if is_last_slice:
parent_stream.cursor.ensure_at_least_one_state_emitted()` blocks always run;
finally, guard the `yield StreamSlice(...)` (the emission that uses
`parent_record`, `partition_value`, `extracted_extra_fields`,
`parent_partition`, etc.) so it only occurs when `skip_slice` is False. This
ensures partition closure always executes while still skipping emission for
failed extractions.

Comment on lines +231 to +267
try:
partition_value = dpath.get(
record_data, # type: ignore [arg-type]
parent_field,
)
except KeyError:
# FIXME a log here would go a long way for debugging
continue

# Add extra fields
extracted_extra_fields = self._extract_extra_fields(
record_data, extra_fields
)

if parent_stream_config.lazy_read_pointer:
extracted_extra_fields = {
"child_response": self._extract_child_response(
record_data,
parent_stream_config.lazy_read_pointer, # type: ignore[arg-type] # lazy_read_pointer type handeled in __post_init__ of parent_stream_config
),
**extracted_extra_fields,
}

if is_last_record_in_slice:
parent_stream.cursor.close_partition(partition)
if is_last_slice:
parent_stream.cursor.ensure_at_least_one_state_emitted()

yield StreamSlice(
partition={
partition_field: partition_value,
"parent_slice": parent_partition or {},
},
cursor_slice={},
extra_fields=extracted_extra_fields,
)
if parent_record is not None:
yield StreamSlice(
partition={
partition_field: partition_value,
"parent_slice": parent_partition or {},
},
cursor_slice={},
extra_fields=extracted_extra_fields,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid skipping partition closure when partition-value extraction fails.

The early continue in the extraction-failure path skips close_partition() / ensure_at_least_one_state_emitted() when that record is last, leaving the cursor partition open. Could we switch to a skip_slice flag so closure always runs and only slice emission is skipped, wdyt?

🔧 Possible fix
                     for parent_record, is_last_record_in_slice in iterate_with_last_flag(
                         partition.read()
                     ):
-                        if parent_record is not None:
+                        skip_slice = True
+                        if parent_record is not None:
                             # In the previous CDK implementation, state management was done internally by the stream.
                             # However, this could cause issues when doing availability check for example as the availability
                             # check would progress the state so state management was moved outside of the read method.
                             # Hence, we need to call the cursor here.
                             # Note that we call observe and close_partition before emitting the associated record as the
                             # ConcurrentPerPartitionCursor will associate a record with the state of the stream after the
                             # record was consumed.
                             parent_stream.cursor.observe(parent_record)
                             parent_partition = (
                                 parent_record.associated_slice.partition
                                 if parent_record.associated_slice
                                 else {}
                             )
                             record_data = parent_record.data
 
                             try:
                                 partition_value = dpath.get(
                                     record_data,  # type: ignore [arg-type]
                                     parent_field,
                                 )
                             except KeyError:
                                 # FIXME a log here would go a long way for debugging
-                                continue
-
-                            # Add extra fields
-                            extracted_extra_fields = self._extract_extra_fields(
-                                record_data, extra_fields
-                            )
-
-                            if parent_stream_config.lazy_read_pointer:
-                                extracted_extra_fields = {
-                                    "child_response": self._extract_child_response(
-                                        record_data,
-                                        parent_stream_config.lazy_read_pointer,  # type: ignore[arg-type]  # lazy_read_pointer type handeled in __post_init__ of parent_stream_config
-                                    ),
-                                    **extracted_extra_fields,
-                                }
+                                skip_slice = True
+                            else:
+                                skip_slice = False
+
+                                # Add extra fields
+                                extracted_extra_fields = self._extract_extra_fields(
+                                    record_data, extra_fields
+                                )
+
+                                if parent_stream_config.lazy_read_pointer:
+                                    extracted_extra_fields = {
+                                        "child_response": self._extract_child_response(
+                                            record_data,
+                                            parent_stream_config.lazy_read_pointer,  # type: ignore[arg-type]  # lazy_read_pointer type handeled in __post_init__ of parent_stream_config
+                                        ),
+                                        **extracted_extra_fields,
+                                    }
 
                         if is_last_record_in_slice:
                             parent_stream.cursor.close_partition(partition)
                             if is_last_slice:
                                 parent_stream.cursor.ensure_at_least_one_state_emitted()
 
-                        if parent_record is not None:
+                        if not skip_slice:
                             yield StreamSlice(
                                 partition={
                                     partition_field: partition_value,
                                     "parent_slice": parent_partition or {},
                                 },
                                 cursor_slice={},
                                 extra_fields=extracted_extra_fields,
                             )
🤖 Prompt for AI Agents
In
`@airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py`
around lines 231 - 267, The try/except around dpath.get in
substream_partition_router should not use an early `continue` because that skips
the later partition closure logic; instead, introduce a boolean like `skip_slice
= True` when KeyError occurs (and only set `skip_slice = False` when extraction
succeeds), keep `extracted_extra_fields` in a safe default state, and let
execution fall through so the existing `if is_last_record_in_slice:
parent_stream.cursor.close_partition(partition)` / `if is_last_slice:
parent_stream.cursor.ensure_at_least_one_state_emitted()` blocks always run;
finally, guard the `yield StreamSlice(...)` (the emission that uses
`parent_record`, `partition_value`, `extracted_extra_fields`,
`parent_partition`, etc.) so it only occurs when `skip_slice` is False. This
ensures partition closure always executes while still skipping emission for
failed extractions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants