Created
June 9, 2025 16:51
-
-
Save pepijnve/0e1a66f98033c6c44c62a51fb9dbae5a to your computer and use it in GitHub Desktop.
interleave test patch
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| ``` | |
| Index: datafusion/core/tests/execution/infinite_cancel.rs | |
| IDEA additional info: | |
| Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP | |
| <+>UTF-8 | |
| =================================================================== | |
| diff --git a/datafusion/core/tests/execution/infinite_cancel.rs b/datafusion/core/tests/execution/infinite_cancel.rs | |
| --- a/datafusion/core/tests/execution/infinite_cancel.rs (revision 287b2a9e9eb7f779d7bca99306a6b76d58965d92) | |
| +++ b/datafusion/core/tests/execution/infinite_cancel.rs (date 1749487490790) | |
| @@ -220,11 +220,8 @@ | |
| Ok(()) | |
| } | |
| -#[rstest] | |
| #[tokio::test] | |
| -async fn test_infinite_interleave_cancel( | |
| - #[values(false, true)] pretend_finite: bool, | |
| -) -> Result<(), Box<dyn Error>> { | |
| +async fn test_infinite_interleave_cancel() -> Result<(), Box<dyn Error>> { | |
| // 1) Build a session and a schema with one i64 column. | |
| let session_ctx = SessionContext::new(); | |
| let schema = Arc::new(Schema::new(Fields::from(vec![Field::new( | |
| @@ -246,7 +243,7 @@ | |
| for thr in thresholds { | |
| // 2a) Set up the infinite source | |
| - let inf = make_lazy_exec(batch.clone(), schema.clone(), pretend_finite); | |
| + let inf = make_lazy_exec(batch.clone(), schema.clone(), false); | |
| // 2b) Apply a FilterExec with predicate "value > thr". | |
| let filter_expr = Arc::new(BinaryExpr::new( | |
| @@ -258,15 +255,15 @@ | |
| // 2c) Wrap the filtered stream in CoalesceBatchesExec so it emits | |
| // one 8192-row batch at a time. | |
| - let coalesced = Arc::new(CoalesceBatchesExec::new(filtered, 8192)); | |
| + // let coalesced = Arc::new(CoalesceBatchesExec::new(filtered, 8192)); | |
| // 2d) Repartition each coalesced stream by hashing on "value" into 1 partition. | |
| // Required for InterleaveExec::try_new to succeed. | |
| - let exprs = vec![Arc::new(Column::new_with_schema("value", &schema)?) as _]; | |
| - let partitioning = Partitioning::Hash(exprs, 1); | |
| - let hashed = Arc::new(RepartitionExec::try_new(coalesced, partitioning)?); | |
| + // let exprs = vec![Arc::new(Column::new_with_schema("value", &schema)?) as _]; | |
| + // let partitioning = Partitioning::Hash(exprs, 1); | |
| + // let hashed = Arc::new(RepartitionExec::try_new(coalesced, partitioning)?); | |
| - infinite_children.push(hashed as Arc<dyn ExecutionPlan>); | |
| + infinite_children.push(filtered as Arc<dyn ExecutionPlan>); | |
| } | |
| // 3) Build an InterleaveExec over all infinite children. | |
| Index: datafusion/physical-plan/src/memory.rs | |
| IDEA additional info: | |
| Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP | |
| <+>UTF-8 | |
| =================================================================== | |
| diff --git a/datafusion/physical-plan/src/memory.rs b/datafusion/physical-plan/src/memory.rs | |
| --- a/datafusion/physical-plan/src/memory.rs (revision 287b2a9e9eb7f779d7bca99306a6b76d58965d92) | |
| +++ b/datafusion/physical-plan/src/memory.rs (date 1749487603099) | |
| @@ -162,7 +162,7 @@ | |
| ) -> Result<Self> { | |
| let cache = PlanProperties::new( | |
| EquivalenceProperties::new(Arc::clone(&schema)), | |
| - Partitioning::RoundRobinBatch(generators.len()), | |
| + Partitioning::Hash(vec![], 1), | |
| EmissionType::Incremental, | |
| Boundedness::Bounded, | |
| ); | |
| ``` |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment