Skip to content

[SPARK-56346][SQL] Use PartitionPredicate in DSV2 Metadata Only Delete#55179

Open
szehon-ho wants to merge 1 commit intoapache:masterfrom
szehon-ho:delete_partition_filter
Open

[SPARK-56346][SQL] Use PartitionPredicate in DSV2 Metadata Only Delete#55179
szehon-ho wants to merge 1 commit intoapache:masterfrom
szehon-ho:delete_partition_filter

Conversation

@szehon-ho
Copy link
Copy Markdown
Member

Summary

When OptimizeMetadataOnlyDeleteFromTable fails to push standard V2 predicates (e.g. IN, STARTS_WITH) for a metadata-only delete, it now falls back to a second pass that:

  1. Converts partition-column filters to PartitionPredicates (reusing SPARK-55596 infrastructure)
  2. Translates remaining data-column filters to standard V2 predicates
  3. Combines them (partition predicates first) and calls table.canDeleteWhere

This mirrors the two-pass approach already used for scan filter pushdown in PushDownUtils.pushPartitionPredicates.

Changes

  • OptimizeMetadataOnlyDeleteFromTable: Added tryDeleteWithPartitionPredicates fallback method and tryTranslateToV2 helper
  • PushDownUtils: Extracted createPartitionPredicates and made flattenNestedPartitionFilters package-private for reuse; getPartitionPredicateSchema now returns None for empty partition fields
  • InMemoryTableWithV2Filter: Extracted evalPredicate to companion object for reuse by test tables
  • InMemoryPartitionPredicateDeleteTable (new): Test table supporting PartitionPredicates and configurable data predicate acceptance
  • DataSourceV2EnhancedDeleteFilterSuite (new): 9 test cases covering first-pass accept, second-pass accept/reject, mixed partition+data filters, UDF on non-contiguous partition columns, multiple PartitionPredicates, and row-level fallback

Test plan

  • DataSourceV2EnhancedDeleteFilterSuite — 9/9 pass
  • DataSourceV2EnhancedPartitionFilterSuite — 19/19 pass (no regressions)
  • GroupBasedDeleteFromTableSuite — 32/32 pass (no regressions)
  • Scalastyle — 0 errors

When `OptimizeMetadataOnlyDeleteFromTable` fails to push standard V2 predicates for a metadata-only delete, it now falls back to a second pass that converts partition-column filters to `PartitionPredicate`s (SPARK-55596) and combines them with translated V2 data filters.
@szehon-ho szehon-ho force-pushed the delete_partition_filter branch from cb0ff92 to 33d100e Compare April 3, 2026 00:05
* Evaluates a single V2 predicate by resolving column values through the
* given function. Supports =, <=>, IS_NULL, IS_NOT_NULL, and ALWAYS_TRUE.
*/
def evalPredicate(
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just refactor for re-use in new test InMemoryTable

}

/**
* Separates partition filters from data filters and converts pushable partition
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again, refactor for re-use in OptimizeMetadataOnlyDeleteQuery

* Returns a map from flattened expression to original.
*/
private def normalizeNestedPartitionFilters(
private[v2] def flattenNestedPartitionFilters(
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename, because 'normalize' is already used in OptimizeMetadataOnlyDelete

Copy link
Copy Markdown
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary

Prior state and problem: OptimizeMetadataOnlyDeleteFromTable could only perform metadata-only deletes when all filter expressions translated to standard V2 predicates (e.g., =, <=>, IS_NULL). Filters like IN, STARTS_WITH, or UDFs on partition columns caused a fallback to expensive row-level operations even though the table might accept PartitionPredicates.

Design approach: Add a second-pass fallback in the delete optimization rule that mirrors the existing two-pass approach in PushDownUtils.pushPartitionPredicates for scan filter pushdown. When the first pass (V2 translation) fails or is rejected, the second pass:

  1. Separates filters into partition-column and data-column categories
  2. Converts partition filters to PartitionPredicates via PartitionPredicateImpl
  3. Translates remaining data filters to standard V2 predicates
  4. Combines both and calls table.canDeleteWhere

Key design decisions:

  • No supportsIterativePushdown gate for the delete path (the scan path has one). This is intentional — canDeleteWhere already serves as the acceptance gate, and the supportsIterativePushdown opt-in is specific to ScanBuilder.
  • All-or-nothing semantics: if any remaining data filter can't translate to V2, the entire second pass fails and falls back to row-level. This differs from the scan path (which returns remaining filters for post-scan evaluation) because metadata-only deletes require complete filter acceptance.

Implementation sketch: OptimizeMetadataOnlyDeleteFromTable.apply → first tries tryTranslateToV2 (standard V2 path), on failure → tryDeleteWithPartitionPredicates (second pass via shared PushDownUtils.createPartitionPredicates and flattenNestedPartitionFilters), on failure → row-level plan. The shared methods were extracted from the existing pushPartitionPredicates and made package-private for reuse.

General comments

  • The logDebug message on the original first-pass success path was removed. With three possible outcomes now (first-pass V2, second-pass partition predicates, row-level fallback), adding logDebug for each path would help with debugging filter pushdown behavior.

}
if (fields.length == transforms.length) {
Some(fields.toSeq)
Some(fields.toSeq).filter(_.nonEmpty)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This .filter(_.nonEmpty) guard is redundant: the outer check at line 139 guarantees transforms.nonEmpty, and fields.length == transforms.length at line 151 ensures fields is non-empty.

Suggested change
Some(fields.toSeq).filter(_.nonEmpty)
Some(fields.toSeq)

candidateKeys
}

// Handle data predicates (simulate data source with data column statistics)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment says "data column statistics" but the code evaluates predicates row-by-row, not via statistics.

Suggested change
// Handle data predicates (simulate data source with data column statistics)
// Handle data predicates (simulate a data source applying row-level data filters)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants