[SPARK-56346][SQL] Use PartitionPredicate in DSV2 Metadata Only Delete#55179
[SPARK-56346][SQL] Use PartitionPredicate in DSV2 Metadata Only Delete#55179szehon-ho wants to merge 1 commit intoapache:masterfrom
Conversation
When `OptimizeMetadataOnlyDeleteFromTable` fails to push standard V2 predicates for a metadata-only delete, it now falls back to a second pass that converts partition-column filters to `PartitionPredicate`s (SPARK-55596) and combines them with translated V2 data filters.
cb0ff92 to
33d100e
Compare
| * Evaluates a single V2 predicate by resolving column values through the | ||
| * given function. Supports =, <=>, IS_NULL, IS_NOT_NULL, and ALWAYS_TRUE. | ||
| */ | ||
| def evalPredicate( |
There was a problem hiding this comment.
just refactor for re-use in new test InMemoryTable
| } | ||
|
|
||
| /** | ||
| * Separates partition filters from data filters and converts pushable partition |
There was a problem hiding this comment.
again, refactor for re-use in OptimizeMetadataOnlyDeleteQuery
| * Returns a map from flattened expression to original. | ||
| */ | ||
| private def normalizeNestedPartitionFilters( | ||
| private[v2] def flattenNestedPartitionFilters( |
There was a problem hiding this comment.
rename, because 'normalize' is already used in OptimizeMetadataOnlyDelete
cloud-fan
left a comment
There was a problem hiding this comment.
Summary
Prior state and problem: OptimizeMetadataOnlyDeleteFromTable could only perform metadata-only deletes when all filter expressions translated to standard V2 predicates (e.g., =, <=>, IS_NULL). Filters like IN, STARTS_WITH, or UDFs on partition columns caused a fallback to expensive row-level operations even though the table might accept PartitionPredicates.
Design approach: Add a second-pass fallback in the delete optimization rule that mirrors the existing two-pass approach in PushDownUtils.pushPartitionPredicates for scan filter pushdown. When the first pass (V2 translation) fails or is rejected, the second pass:
- Separates filters into partition-column and data-column categories
- Converts partition filters to
PartitionPredicates viaPartitionPredicateImpl - Translates remaining data filters to standard V2 predicates
- Combines both and calls
table.canDeleteWhere
Key design decisions:
- No
supportsIterativePushdowngate for the delete path (the scan path has one). This is intentional —canDeleteWherealready serves as the acceptance gate, and thesupportsIterativePushdownopt-in is specific toScanBuilder. - All-or-nothing semantics: if any remaining data filter can't translate to V2, the entire second pass fails and falls back to row-level. This differs from the scan path (which returns remaining filters for post-scan evaluation) because metadata-only deletes require complete filter acceptance.
Implementation sketch: OptimizeMetadataOnlyDeleteFromTable.apply → first tries tryTranslateToV2 (standard V2 path), on failure → tryDeleteWithPartitionPredicates (second pass via shared PushDownUtils.createPartitionPredicates and flattenNestedPartitionFilters), on failure → row-level plan. The shared methods were extracted from the existing pushPartitionPredicates and made package-private for reuse.
General comments
- The
logDebugmessage on the original first-pass success path was removed. With three possible outcomes now (first-pass V2, second-pass partition predicates, row-level fallback), addinglogDebugfor each path would help with debugging filter pushdown behavior.
| } | ||
| if (fields.length == transforms.length) { | ||
| Some(fields.toSeq) | ||
| Some(fields.toSeq).filter(_.nonEmpty) |
There was a problem hiding this comment.
This .filter(_.nonEmpty) guard is redundant: the outer check at line 139 guarantees transforms.nonEmpty, and fields.length == transforms.length at line 151 ensures fields is non-empty.
| Some(fields.toSeq).filter(_.nonEmpty) | |
| Some(fields.toSeq) |
| candidateKeys | ||
| } | ||
|
|
||
| // Handle data predicates (simulate data source with data column statistics) |
There was a problem hiding this comment.
The comment says "data column statistics" but the code evaluates predicates row-by-row, not via statistics.
| // Handle data predicates (simulate data source with data column statistics) | |
| // Handle data predicates (simulate a data source applying row-level data filters) |
Summary
When
OptimizeMetadataOnlyDeleteFromTablefails to push standard V2 predicates (e.g.IN,STARTS_WITH) for a metadata-only delete, it now falls back to a second pass that:PartitionPredicates (reusing SPARK-55596 infrastructure)table.canDeleteWhereThis mirrors the two-pass approach already used for scan filter pushdown in
PushDownUtils.pushPartitionPredicates.Changes
OptimizeMetadataOnlyDeleteFromTable: AddedtryDeleteWithPartitionPredicatesfallback method andtryTranslateToV2helperPushDownUtils: ExtractedcreatePartitionPredicatesand madeflattenNestedPartitionFilterspackage-private for reuse;getPartitionPredicateSchemanow returnsNonefor empty partition fieldsInMemoryTableWithV2Filter: ExtractedevalPredicateto companion object for reuse by test tablesInMemoryPartitionPredicateDeleteTable(new): Test table supportingPartitionPredicates and configurable data predicate acceptanceDataSourceV2EnhancedDeleteFilterSuite(new): 9 test cases covering first-pass accept, second-pass accept/reject, mixed partition+data filters, UDF on non-contiguous partition columns, multiple PartitionPredicates, and row-level fallbackTest plan
DataSourceV2EnhancedDeleteFilterSuite— 9/9 passDataSourceV2EnhancedPartitionFilterSuite— 19/19 pass (no regressions)GroupBasedDeleteFromTableSuite— 32/32 pass (no regressions)