Skip to content

Instantly share code, notes, and snippets.

@pepijnve
Created June 9, 2025 16:51
Show Gist options
  • Select an option

  • Save pepijnve/0e1a66f98033c6c44c62a51fb9dbae5a to your computer and use it in GitHub Desktop.

Select an option

Save pepijnve/0e1a66f98033c6c44c62a51fb9dbae5a to your computer and use it in GitHub Desktop.
interleave test patch
```
Index: datafusion/core/tests/execution/infinite_cancel.rs
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/datafusion/core/tests/execution/infinite_cancel.rs b/datafusion/core/tests/execution/infinite_cancel.rs
--- a/datafusion/core/tests/execution/infinite_cancel.rs (revision 287b2a9e9eb7f779d7bca99306a6b76d58965d92)
+++ b/datafusion/core/tests/execution/infinite_cancel.rs (date 1749487490790)
@@ -220,11 +220,8 @@
Ok(())
}
-#[rstest]
#[tokio::test]
-async fn test_infinite_interleave_cancel(
- #[values(false, true)] pretend_finite: bool,
-) -> Result<(), Box<dyn Error>> {
+async fn test_infinite_interleave_cancel() -> Result<(), Box<dyn Error>> {
// 1) Build a session and a schema with one i64 column.
let session_ctx = SessionContext::new();
let schema = Arc::new(Schema::new(Fields::from(vec![Field::new(
@@ -246,7 +243,7 @@
for thr in thresholds {
// 2a) Set up the infinite source
- let inf = make_lazy_exec(batch.clone(), schema.clone(), pretend_finite);
+ let inf = make_lazy_exec(batch.clone(), schema.clone(), false);
// 2b) Apply a FilterExec with predicate "value > thr".
let filter_expr = Arc::new(BinaryExpr::new(
@@ -258,15 +255,15 @@
// 2c) Wrap the filtered stream in CoalesceBatchesExec so it emits
// one 8192-row batch at a time.
- let coalesced = Arc::new(CoalesceBatchesExec::new(filtered, 8192));
+ // let coalesced = Arc::new(CoalesceBatchesExec::new(filtered, 8192));
// 2d) Repartition each coalesced stream by hashing on "value" into 1 partition.
// Required for InterleaveExec::try_new to succeed.
- let exprs = vec![Arc::new(Column::new_with_schema("value", &schema)?) as _];
- let partitioning = Partitioning::Hash(exprs, 1);
- let hashed = Arc::new(RepartitionExec::try_new(coalesced, partitioning)?);
+ // let exprs = vec![Arc::new(Column::new_with_schema("value", &schema)?) as _];
+ // let partitioning = Partitioning::Hash(exprs, 1);
+ // let hashed = Arc::new(RepartitionExec::try_new(coalesced, partitioning)?);
- infinite_children.push(hashed as Arc<dyn ExecutionPlan>);
+ infinite_children.push(filtered as Arc<dyn ExecutionPlan>);
}
// 3) Build an InterleaveExec over all infinite children.
Index: datafusion/physical-plan/src/memory.rs
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/datafusion/physical-plan/src/memory.rs b/datafusion/physical-plan/src/memory.rs
--- a/datafusion/physical-plan/src/memory.rs (revision 287b2a9e9eb7f779d7bca99306a6b76d58965d92)
+++ b/datafusion/physical-plan/src/memory.rs (date 1749487603099)
@@ -162,7 +162,7 @@
) -> Result<Self> {
let cache = PlanProperties::new(
EquivalenceProperties::new(Arc::clone(&schema)),
- Partitioning::RoundRobinBatch(generators.len()),
+ Partitioning::Hash(vec![], 1),
EmissionType::Incremental,
Boundedness::Bounded,
);
```
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment