fix(cdk): fix SubstreamPartitionRouter updating cursor value when no records were read in partition#889
fix(cdk): fix SubstreamPartitionRouter updating cursor value when no records were read in partition#889darynaishchenko wants to merge 7 commits intomainfrom
Conversation
…re read in partition
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. 💡 Show Tips and TricksTesting This CDK VersionYou can test this version of the CDK using the following: # Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@daryna/fix-substream-partition-router#egg=airbyte-python-cdk[dev]' --help
# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch daryna/fix-substream-partition-routerPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
|
📝 WalkthroughWalkthroughChanged SubstreamPartitionRouter to add Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Would you like me to generate a small focused patch-level checklist for reviewers (key lines/behaviors to verify), wdyt? 🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py (1)
42-49:⚠️ Potential issue | 🔴 CriticalType annotation mismatch causing pipeline failure.
The pipeline is flagging a type error here. Yielding
(None, True)producestuple[None, bool], but the return type expectstuple[T, bool]. Since the sentinel introducesNoneas a possible value, would updating the return type toOptional[T]work for you, wdyt?🔧 Proposed fix
-def iterate_with_last_flag(generator: Iterable[T]) -> Iterable[tuple[T, bool]]: +def iterate_with_last_flag(generator: Iterable[T]) -> Iterable[tuple[Optional[T], bool]]:
🤖 Fix all issues with AI agents
In
`@airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py`:
- Around line 210-211: In stream_slices() in substream_partition_router.py the
check "if partition is None: return" prematurely exits the entire method when a
parent_stream.generate_partitions() yields the (None, True) sentinel; change
this to "break" (or otherwise exit the inner partition loop) so we stop
iterating that parent stream's partitions and continue to the next
parent_stream_config instead of returning from stream_slices(), leaving the rest
of the parent streams to be processed; locate the partition handling inside the
loop that consumes parent_stream.generate_partitions() and replace the return
with a break (or equivalent loop-control) so behavior matches the original
semantics.
airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py
Outdated
Show resolved
Hide resolved
PyTest Results (Fast)1 373 tests - 2 482 1 361 ✅ - 2 482 5m 12s ⏱️ - 1m 15s For more details on these failures, see this check. Results for commit bdccfd7. ± Comparison against base commit 15542de. This pull request removes 2482 tests.♻️ This comment has been updated with latest results. |
PyTest Results (Full)3 866 tests +8 3 853 ✅ +7 11m 5s ⏱️ +4s For more details on these failures, see this check. Results for commit bdccfd7. ± Comparison against base commit 15542de. ♻️ This comment has been updated with latest results. |
|
/prerelease
|
…titionRouter null handling Co-Authored-By: Daryna Ishchenko <darina.ishchenko17@gmail.com>
…ulti-parent behavior Co-Authored-By: Daryna Ishchenko <darina.ishchenko17@gmail.com>
There was a problem hiding this comment.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py (2)
42-49:⚠️ Potential issue | 🟠 MajorFix mypy type mismatch in
iterate_with_last_flag.
Line 48 yields(None, True), but the signature advertisestuple[T, bool], which is tripping mypy. Could we widen the return type toOptional[T]to match the sentinel, wdyt?🔧 Proposed fix
-def iterate_with_last_flag(generator: Iterable[T]) -> Iterable[tuple[T, bool]]: +def iterate_with_last_flag(generator: Iterable[T]) -> Iterable[tuple[Optional[T], bool]]:
186-187:⚠️ Potential issue | 🟡 MinorUpdate docstring to match empty-slice behavior.
Lines 186–187 still say an empty parent slice emitsparent_record=None, but the code now skips emission. Could we adjust the docstring to reflect the new behavior, wdyt?📝 Suggested doc tweak
- If a parent slice contains no record, emit a slice with parent_record=None. + If a parent slice contains no record, emit no slice.
| assert result == expected_output | ||
|
|
||
|
|
||
| def test_substream_partition_router_no_cursor_update_when_partition_has_no_records(): |
There was a problem hiding this comment.
Please also add a test that validates that all parent stream partitions are properly closed, even if no parent records are produced for a given parent partition.
… when no records Co-Authored-By: Daryna Ishchenko <darina.ishchenko17@gmail.com>
… empty generators Co-Authored-By: Daryna Ishchenko <darina.ishchenko17@gmail.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In
`@airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py`:
- Around line 231-267: The try/except around dpath.get in
substream_partition_router should not use an early `continue` because that skips
the later partition closure logic; instead, introduce a boolean like `skip_slice
= True` when KeyError occurs (and only set `skip_slice = False` when extraction
succeeds), keep `extracted_extra_fields` in a safe default state, and let
execution fall through so the existing `if is_last_record_in_slice:
parent_stream.cursor.close_partition(partition)` / `if is_last_slice:
parent_stream.cursor.ensure_at_least_one_state_emitted()` blocks always run;
finally, guard the `yield StreamSlice(...)` (the emission that uses
`parent_record`, `partition_value`, `extracted_extra_fields`,
`parent_partition`, etc.) so it only occurs when `skip_slice` is False. This
ensures partition closure always executes while still skipping emission for
failed extractions.
| try: | ||
| partition_value = dpath.get( | ||
| record_data, # type: ignore [arg-type] | ||
| parent_field, | ||
| ) | ||
| except KeyError: | ||
| # FIXME a log here would go a long way for debugging | ||
| continue | ||
|
|
||
| # Add extra fields | ||
| extracted_extra_fields = self._extract_extra_fields( | ||
| record_data, extra_fields | ||
| ) | ||
|
|
||
| if parent_stream_config.lazy_read_pointer: | ||
| extracted_extra_fields = { | ||
| "child_response": self._extract_child_response( | ||
| record_data, | ||
| parent_stream_config.lazy_read_pointer, # type: ignore[arg-type] # lazy_read_pointer type handeled in __post_init__ of parent_stream_config | ||
| ), | ||
| **extracted_extra_fields, | ||
| } | ||
|
|
||
| if is_last_record_in_slice: | ||
| parent_stream.cursor.close_partition(partition) | ||
| if is_last_slice: | ||
| parent_stream.cursor.ensure_at_least_one_state_emitted() | ||
|
|
||
| yield StreamSlice( | ||
| partition={ | ||
| partition_field: partition_value, | ||
| "parent_slice": parent_partition or {}, | ||
| }, | ||
| cursor_slice={}, | ||
| extra_fields=extracted_extra_fields, | ||
| ) | ||
| if parent_record is not None: | ||
| yield StreamSlice( | ||
| partition={ | ||
| partition_field: partition_value, | ||
| "parent_slice": parent_partition or {}, | ||
| }, | ||
| cursor_slice={}, | ||
| extra_fields=extracted_extra_fields, | ||
| ) |
There was a problem hiding this comment.
Avoid skipping partition closure when partition-value extraction fails.
The early continue in the extraction-failure path skips close_partition() / ensure_at_least_one_state_emitted() when that record is last, leaving the cursor partition open. Could we switch to a skip_slice flag so closure always runs and only slice emission is skipped, wdyt?
🔧 Possible fix
for parent_record, is_last_record_in_slice in iterate_with_last_flag(
partition.read()
):
- if parent_record is not None:
+ skip_slice = True
+ if parent_record is not None:
# In the previous CDK implementation, state management was done internally by the stream.
# However, this could cause issues when doing availability check for example as the availability
# check would progress the state so state management was moved outside of the read method.
# Hence, we need to call the cursor here.
# Note that we call observe and close_partition before emitting the associated record as the
# ConcurrentPerPartitionCursor will associate a record with the state of the stream after the
# record was consumed.
parent_stream.cursor.observe(parent_record)
parent_partition = (
parent_record.associated_slice.partition
if parent_record.associated_slice
else {}
)
record_data = parent_record.data
try:
partition_value = dpath.get(
record_data, # type: ignore [arg-type]
parent_field,
)
except KeyError:
# FIXME a log here would go a long way for debugging
- continue
-
- # Add extra fields
- extracted_extra_fields = self._extract_extra_fields(
- record_data, extra_fields
- )
-
- if parent_stream_config.lazy_read_pointer:
- extracted_extra_fields = {
- "child_response": self._extract_child_response(
- record_data,
- parent_stream_config.lazy_read_pointer, # type: ignore[arg-type] # lazy_read_pointer type handeled in __post_init__ of parent_stream_config
- ),
- **extracted_extra_fields,
- }
+ skip_slice = True
+ else:
+ skip_slice = False
+
+ # Add extra fields
+ extracted_extra_fields = self._extract_extra_fields(
+ record_data, extra_fields
+ )
+
+ if parent_stream_config.lazy_read_pointer:
+ extracted_extra_fields = {
+ "child_response": self._extract_child_response(
+ record_data,
+ parent_stream_config.lazy_read_pointer, # type: ignore[arg-type] # lazy_read_pointer type handeled in __post_init__ of parent_stream_config
+ ),
+ **extracted_extra_fields,
+ }
if is_last_record_in_slice:
parent_stream.cursor.close_partition(partition)
if is_last_slice:
parent_stream.cursor.ensure_at_least_one_state_emitted()
- if parent_record is not None:
+ if not skip_slice:
yield StreamSlice(
partition={
partition_field: partition_value,
"parent_slice": parent_partition or {},
},
cursor_slice={},
extra_fields=extracted_extra_fields,
)🤖 Prompt for AI Agents
In
`@airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py`
around lines 231 - 267, The try/except around dpath.get in
substream_partition_router should not use an early `continue` because that skips
the later partition closure logic; instead, introduce a boolean like `skip_slice
= True` when KeyError occurs (and only set `skip_slice = False` when extraction
succeeds), keep `extracted_extra_fields` in a safe default state, and let
execution fall through so the existing `if is_last_record_in_slice:
parent_stream.cursor.close_partition(partition)` / `if is_last_slice:
parent_stream.cursor.ensure_at_least_one_state_emitted()` blocks always run;
finally, guard the `yield StreamSlice(...)` (the emission that uses
`parent_record`, `partition_value`, `extracted_extra_fields`,
`parent_partition`, etc.) so it only occurs when `skip_slice` is False. This
ensures partition closure always executes while still skipping emission for
failed extractions.
Summary
This PR fixes an issue where
SubstreamPartitionRouterwould incorrectly update cursor values when no records were read in a partition. The fix adds defensive null-checking throughoutstream_slices()to gracefully handle:iterate_with_last_flagnow yields(None, True)sentinel when the input generator is empty, andstream_slices()breaks out of the partition loop when partition is None (preserving multi-parent behavior)if parent_record is not None)Updates since last revision
if partition is None: returntoif partition is None: breakper review feedback. Usingbreakensures that if one parent stream has no partitions, we continue processing subsequentparent_stream_configsrather than exiting the entire method.test_substream_partition_router_closes_all_partitions_even_when_no_recordsto verify thatcursor.close_partition()is called for ALL partitions, including those with no records. This addresses the review feedback requesting validation of partition lifecycle management.iterate_with_last_flagreturn type fromtuple[T, bool]totuple[T | None, bool]to properly reflect that the function yields(None, True)as a sentinel for empty generators.Review & Testing Checklist for Human
test_substream_slicer_parent_state_update_with_cursoris failing on this branch but passes on main. The test expectslookback_window: 0but getslookback_window: 1, and there's an extrastatekey in the output. This appears to be a pre-existing issue in the PR's original changes that needs investigation.if partition is None: breakexits only the current parent's partition loop. Test with a connector that has multipleparent_stream_configswhere one parent has no partitions to confirm subsequent parents are still processed. (Note: current tests only cover single-parent scenarios)SubstreamPartitionRouterwhere a parent stream returns empty partitions or partitions with no recordspoetry run pytest unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py -vNotes
Summary by CodeRabbit
New Features
Bug Fixes
Tests