Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,36 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(true)

val COMET_EXEC_SHUFFLE_WITH_ROUND_ROBIN_PARTITIONING_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.native.shuffle.partitioning.roundrobin.enabled")
.category(CATEGORY_SHUFFLE)
.doc(
"Whether to enable round robin partitioning for Comet native shuffle. " +
"This is disabled by default because Comet's round-robin produces different " +
"partition assignments than Spark. Spark sorts rows by their binary UnsafeRow " +
"representation before assigning partitions, but Comet uses Arrow format which " +
"has a different binary layout. Instead, Comet implements round-robin as hash " +
"partitioning on all columns, which achieves the same goals: even distribution, " +
"deterministic output (for fault tolerance), and no semantic grouping. " +
"Sorted output will be identical to Spark, but unsorted row ordering may differ.")
.booleanConf
.createWithDefault(false)

val COMET_EXEC_SHUFFLE_WITH_ROUND_ROBIN_PARTITIONING_MAX_HASH_COLUMNS: ConfigEntry[Int] =
conf("spark.comet.native.shuffle.partitioning.roundrobin.maxHashColumns")
.category(CATEGORY_SHUFFLE)
.doc(
"The maximum number of columns to hash for round robin partitioning. " +
"When set to 0 (the default), all columns are hashed. " +
"When set to a positive value, only the first N columns are used for hashing, " +
"which can improve performance for wide tables while still providing " +
"reasonable distribution.")
.intConf
.checkValue(
v => v >= 0,
"The maximum number of columns to hash for round robin partitioning must be non-negative.")
.createWithDefault(0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add checkValue:

.checkValue(v => v >= 0, "The maximum number of columns to hash for round robin partitioning must be non-negative.")

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added


val COMET_EXEC_SHUFFLE_COMPRESSION_CODEC: ConfigEntry[String] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.compression.codec")
.category(CATEGORY_SHUFFLE)
Expand Down
8 changes: 3 additions & 5 deletions docs/source/contributor-guide/jvm_shuffle.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,10 @@ JVM shuffle (`CometColumnarExchange`) is used instead of native shuffle (`CometE
(not a `CometPlan`), JVM shuffle is the only option since native shuffle requires columnar input
from Comet operators.

3. **Unsupported partitioning type**: Native shuffle only supports `HashPartitioning`, `RangePartitioning`,
and `SinglePartition`. JVM shuffle additionally supports `RoundRobinPartitioning`.

4. **Unsupported partition key types**: For `HashPartitioning` and `RangePartitioning`, native shuffle
3. **Unsupported partition key types**: For `HashPartitioning` and `RangePartitioning`, native shuffle
only supports primitive types as partition keys. Complex types (struct, array, map) cannot be used
as partition keys in native shuffle, though they are fully supported as data columns in both implementations.
as partition keys in native shuffle and will fall back to JVM columnar shuffle. Note that complex types are
fully supported as data columns in both implementations.

## Input Handling

Expand Down
22 changes: 17 additions & 5 deletions docs/source/contributor-guide/native_shuffle.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ Native shuffle (`CometExchange`) is selected when all of the following condition
- `HashPartitioning`
- `RangePartitioning`
- `SinglePartition`

`RoundRobinPartitioning` requires JVM shuffle.
- `RoundRobinPartitioning`

4. **Supported partition key types**: For `HashPartitioning` and `RangePartitioning`, partition
keys must be primitive types. Complex types (struct, array, map) as partition keys require
Expand Down Expand Up @@ -131,7 +130,7 @@ Native shuffle (`CometExchange`) is selected when all of the following condition
2. **Native execution**: `CometExec.getCometIterator()` executes the plan in Rust.

3. **Partitioning**: `ShuffleWriterExec` receives batches and routes to the appropriate partitioner:
- `MultiPartitionShuffleRepartitioner`: For hash/range partitioning
- `MultiPartitionShuffleRepartitioner`: For hash/range/round-robin partitioning
- `SinglePartitionShufflePartitioner`: For single partition (simpler path)

4. **Buffering and spilling**: The partitioner buffers rows per partition. When memory pressure
Expand Down Expand Up @@ -187,6 +186,19 @@ For range partitioning:
The simplest case: all rows go to partition 0. Uses `SinglePartitionShufflePartitioner` which
simply concatenates batches to reach the configured batch size.

### Round Robin Partitioning

Comet implements round robin partitioning using hash-based assignment for determinism:

1. Computes a Murmur3 hash of columns (using seed 42)
2. Assigns partitions directly using the hash: `partition_id = hash % num_partitions`

This approach guarantees determinism across retries, which is critical for fault tolerance.
However, unlike true round robin which cycles through partitions row-by-row, hash-based
assignment only provides even distribution when the data has sufficient variation in the
hashed columns. Data with low cardinality or identical values may result in skewed partition
sizes.

## Memory Management

Native shuffle uses DataFusion's memory management with spilling support:
Expand Down Expand Up @@ -235,8 +247,8 @@ independently compressed, allowing parallel decompression during reads.
| ------------------- | -------------------------------------- | --------------------------------- |
| Input format | Columnar (direct from Comet operators) | Row-based (via ColumnarToRowExec) |
| Partitioning logic | Rust implementation | Spark's partitioner |
| Supported schemes | Hash, Range, Single | Hash, Range, Single, RoundRobin |
| Partition key types | Primitives only | Any type |
| Supported schemes | Hash, Range, Single, RoundRobin | Hash, Range, Single, RoundRobin |
| Partition key types | Primitives only (Hash, Range) | Any type |
| Performance | Higher (no format conversion) | Lower (columnar→row→columnar) |
| Writer variants | Single path | Bypass (hash) and sort-based |

Expand Down
26 changes: 26 additions & 0 deletions docs/source/user-guide/latest/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,32 @@ this can be overridden by setting `spark.comet.regexp.allowIncompatible=true`.
Comet's support for window functions is incomplete and known to be incorrect. It is disabled by default and
should not be used in production. The feature will be enabled in a future release. Tracking issue: [#2721](https://github.com/apache/datafusion-comet/issues/2721).

## Round-Robin Partitioning

Comet's native shuffle implementation of round-robin partitioning (`df.repartition(n)`) is not compatible with
Spark's implementation and is disabled by default. It can be enabled by setting
`spark.comet.native.shuffle.partitioning.roundrobin.enabled=true`.

**Why the incompatibility exists:**

Spark's round-robin partitioning sorts rows by their binary `UnsafeRow` representation before assigning them to
partitions. This ensures deterministic output for fault tolerance (task retries produce identical results).
Comet uses Arrow format internally, which has a completely different binary layout than `UnsafeRow`, making it
impossible to match Spark's exact partition assignments.

**Comet's approach:**

Instead of true round-robin assignment, Comet implements round-robin as hash partitioning on ALL columns. This
achieves the same semantic goals:

- **Even distribution**: Rows are distributed evenly across partitions (as long as the hash varies sufficiently -
in some cases there could be skew)
- **Deterministic**: Same input always produces the same partition assignments (important for fault tolerance)
- **No semantic grouping**: Unlike hash partitioning on specific columns, this doesn't group related rows together

The only difference is that Comet's partition assignments will differ from Spark's. When results are sorted,
they will be identical to Spark. Unsorted results may have different row ordering.

## Cast

Cast operations in Comet fall into three levels of support:
Expand Down
12 changes: 12 additions & 0 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2342,6 +2342,18 @@ impl PhysicalPlanner {
))
}
PartitioningStruct::SinglePartition(_) => Ok(CometPartitioning::SinglePartition),
PartitioningStruct::RoundRobinPartition(rr_partition) => {
// Treat negative max_hash_columns as 0 (no limit)
let max_hash_columns = if rr_partition.max_hash_columns <= 0 {
0
} else {
rr_partition.max_hash_columns as usize
};
Ok(CometPartitioning::RoundRobin(
rr_partition.num_partitions as usize,
max_hash_columns,
))
}
}
}

Expand Down
6 changes: 5 additions & 1 deletion native/core/src/execution/shuffle/comet_partitioning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,18 @@ pub enum CometPartitioning {
/// Rows for comparing to 4) OwnedRows that represent the boundaries of each partition, used with
/// LexOrdering to bin each value in the RecordBatch to a partition.
RangePartitioning(LexOrdering, usize, Arc<RowConverter>, Vec<OwnedRow>),
/// Round robin partitioning. Distributes rows across partitions by sorting them by hash
/// (computed from columns) and then assigning partitions sequentially. Args are:
/// 1) number of partitions, 2) max columns to hash (0 means no limit).
RoundRobin(usize, usize),
}

impl CometPartitioning {
pub fn partition_count(&self) -> usize {
use CometPartitioning::*;
match self {
SinglePartition => 1,
Hash(_, n) | RangePartitioning(_, n, _, _) => *n,
Hash(_, n) | RangePartitioning(_, n, _, _) | RoundRobin(n, _) => *n,
}
}
}
161 changes: 159 additions & 2 deletions native/core/src/execution/shuffle/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,8 +382,11 @@ impl MultiPartitionShuffleRepartitioner {
// The initial values are not used.
let scratch = ScratchSpace {
hashes_buf: match partitioning {
// Only allocate the hashes_buf if hash partitioning.
CometPartitioning::Hash(_, _) => vec![0; batch_size],
// Allocate hashes_buf for hash and round robin partitioning.
// Round robin hashes all columns to achieve even, deterministic distribution.
CometPartitioning::Hash(_, _) | CometPartitioning::RoundRobin(_, _) => {
vec![0; batch_size]
}
_ => vec![],
},
partition_ids: vec![0; batch_size],
Expand Down Expand Up @@ -598,6 +601,68 @@ impl MultiPartitionShuffleRepartitioner {
.await?;
self.scratch = scratch;
}
CometPartitioning::RoundRobin(num_output_partitions, max_hash_columns) => {
// Comet implements "round robin" as hash partitioning on columns.
// This achieves the same goal as Spark's round robin (even distribution
// without semantic grouping) while being deterministic for fault tolerance.
//
// Note: This produces different partition assignments than Spark's round robin,
// which sorts by UnsafeRow binary representation before assigning partitions.
// However, both approaches provide even distribution and determinism.
let mut scratch = std::mem::take(&mut self.scratch);
let (partition_starts, partition_row_indices): (&Vec<u32>, &Vec<u32>) = {
let mut timer = self.metrics.repart_time.timer();

let num_rows = input.num_rows();

// Collect columns for hashing, respecting max_hash_columns limit
// max_hash_columns of 0 means no limit (hash all columns)
// Negative values are normalized to 0 in the planner
let num_columns_to_hash = if *max_hash_columns == 0 {
input.num_columns()
} else {
(*max_hash_columns).min(input.num_columns())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to treat properly negative values

};
let columns_to_hash: Vec<ArrayRef> = (0..num_columns_to_hash)
.map(|i| Arc::clone(input.column(i)))
.collect();

// Use identical seed as Spark hash partitioning.
let hashes_buf = &mut scratch.hashes_buf[..num_rows];
hashes_buf.fill(42_u32);

// Compute hash for selected columns
create_murmur3_hashes(&columns_to_hash, hashes_buf)?;

// Assign partition IDs based on hash (same as hash partitioning)
let partition_ids = &mut scratch.partition_ids[..num_rows];
hashes_buf.iter().enumerate().for_each(|(idx, hash)| {
partition_ids[idx] = pmod(*hash, *num_output_partitions) as u32;
});

// We now have partition ids for every input row, map that to partition starts
// and partition indices to eventually write these rows to partition buffers.
map_partition_ids_to_starts_and_indices(
&mut scratch,
*num_output_partitions,
num_rows,
);

timer.stop();
Ok::<(&Vec<u32>, &Vec<u32>), DataFusionError>((
&scratch.partition_starts,
&scratch.partition_row_indices,
))
}?;

self.buffer_partitioned_batch_may_spill(
input,
partition_row_indices,
partition_starts,
)
.await?;
self.scratch = scratch;
}
other => {
// this should be unreachable as long as the validation logic
// in the constructor is kept up-to-date
Expand Down Expand Up @@ -1431,6 +1496,7 @@ mod test {
Arc::new(row_converter),
owned_rows,
),
CometPartitioning::RoundRobin(num_partitions, 0),
] {
let batches = (0..num_batches).map(|_| batch.clone()).collect::<Vec<_>>();

Expand Down Expand Up @@ -1483,4 +1549,95 @@ mod test {
let expected = vec![69, 5, 193, 171, 115];
assert_eq!(result, expected);
}

#[test]
#[cfg_attr(miri, ignore)]
fn test_round_robin_deterministic() {
// Test that round robin partitioning produces identical results when run multiple times
use std::fs;
use std::io::Read;

let batch_size = 1000;
let num_batches = 10;
let num_partitions = 8;

let batch = create_batch(batch_size);
let batches = (0..num_batches).map(|_| batch.clone()).collect::<Vec<_>>();

// Run shuffle twice and compare results
for run in 0..2 {
let data_file = format!("/tmp/rr_data_{}.out", run);
let index_file = format!("/tmp/rr_index_{}.out", run);

let partitions = std::slice::from_ref(&batches);
let exec = ShuffleWriterExec::try_new(
Arc::new(DataSourceExec::new(Arc::new(
MemorySourceConfig::try_new(partitions, batch.schema(), None).unwrap(),
))),
CometPartitioning::RoundRobin(num_partitions, 0),
CompressionCodec::Zstd(1),
data_file.clone(),
index_file.clone(),
false,
1024 * 1024,
)
.unwrap();

let config = SessionConfig::new();
let runtime_env = Arc::new(
RuntimeEnvBuilder::new()
.with_memory_limit(10 * 1024 * 1024, 1.0)
.build()
.unwrap(),
);
let session_ctx = Arc::new(SessionContext::new_with_config_rt(config, runtime_env));
let task_ctx = Arc::new(TaskContext::from(session_ctx.as_ref()));

// Execute the shuffle
futures::executor::block_on(async {
let mut stream = exec.execute(0, Arc::clone(&task_ctx)).unwrap();
while stream.next().await.is_some() {}
});

if run == 1 {
// Compare data files
let mut data0 = Vec::new();
fs::File::open("/tmp/rr_data_0.out")
.unwrap()
.read_to_end(&mut data0)
.unwrap();
let mut data1 = Vec::new();
fs::File::open("/tmp/rr_data_1.out")
.unwrap()
.read_to_end(&mut data1)
.unwrap();
assert_eq!(
data0, data1,
"Round robin shuffle data should be identical across runs"
);

// Compare index files
let mut index0 = Vec::new();
fs::File::open("/tmp/rr_index_0.out")
.unwrap()
.read_to_end(&mut index0)
.unwrap();
let mut index1 = Vec::new();
fs::File::open("/tmp/rr_index_1.out")
.unwrap()
.read_to_end(&mut index1)
.unwrap();
assert_eq!(
index0, index1,
"Round robin shuffle index should be identical across runs"
);
}
}

// Clean up
let _ = fs::remove_file("/tmp/rr_data_0.out");
let _ = fs::remove_file("/tmp/rr_index_0.out");
let _ = fs::remove_file("/tmp/rr_data_1.out");
let _ = fs::remove_file("/tmp/rr_index_1.out");
}
}
7 changes: 7 additions & 0 deletions native/proto/src/proto/partitioning.proto
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ message Partitioning {
HashPartition hash_partition = 1;
SinglePartition single_partition = 2;
RangePartition range_partition = 3;
RoundRobinPartition round_robin_partition = 4;
}
}

Expand All @@ -51,3 +52,9 @@ message RangePartition {
int32 num_partitions = 2;
repeated BoundaryRow boundary_rows = 4;
}

message RoundRobinPartition {
int32 num_partitions = 1;
// Maximum number of columns to hash. 0 means no limit (hash all columns).
int32 max_hash_columns = 2;
}
Loading
Loading