Created
May 28, 2025 14:51
-
-
Save pepijnve/c013a697b1869ea067e793bf3e1e115a to your computer and use it in GitHub Desktop.
Infinitely running DataFusion query
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| use arrow::array::{Int64Array, RecordBatch}; | |
| use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef}; | |
| use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}; | |
| use datafusion::functions_aggregate::sum; | |
| use datafusion::physical_expr::aggregate::AggregateExprBuilder; | |
| use datafusion::physical_expr::{EquivalenceProperties, Partitioning}; | |
| use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}; | |
| use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; | |
| use datafusion::physical_plan::stream::RecordBatchReceiverStreamBuilder; | |
| use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties}; | |
| use datafusion::{common, physical_plan}; | |
| use datafusion_ext::context::create_session_context; | |
| use futures::{Stream, StreamExt}; | |
| use std::any::Any; | |
| use std::error::Error; | |
| use std::fmt::Formatter; | |
| use std::pin::Pin; | |
| use std::sync::Arc; | |
| use std::task::Poll::Ready; | |
| use std::task::{Context, Poll}; | |
| use tokio::signal; | |
| struct InfiniteStream { | |
| batch: RecordBatch, | |
| poll_count: usize, | |
| } | |
| impl RecordBatchStream for InfiniteStream { | |
| fn schema(&self) -> SchemaRef { | |
| self.batch.schema() | |
| } | |
| } | |
| impl Stream for InfiniteStream { | |
| type Item = common::Result<RecordBatch>; | |
| fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | |
| self.poll_count += 1; | |
| if self.poll_count % 10000 == 0 { | |
| println!("InfiniteStream::poll_next {} times", self.poll_count); | |
| } | |
| Ready(Some(Ok(self.batch.clone()))) | |
| } | |
| } | |
| #[derive(Debug)] | |
| struct InfiniteExec { | |
| batch: RecordBatch, | |
| properties: PlanProperties, | |
| } | |
| impl InfiniteExec { | |
| fn new(batch: &RecordBatch) -> Self { | |
| InfiniteExec { | |
| batch: batch.clone(), | |
| properties: PlanProperties::new( | |
| EquivalenceProperties::new(batch.schema().clone()), | |
| Partitioning::UnknownPartitioning(1), | |
| EmissionType::Incremental, | |
| Boundedness::Unbounded { | |
| requires_infinite_memory: false, | |
| }, | |
| ), | |
| } | |
| } | |
| } | |
| impl DisplayAs for InfiniteExec { | |
| fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { | |
| write!(f, "infinite") | |
| } | |
| } | |
| impl ExecutionPlan for InfiniteExec { | |
| fn name(&self) -> &str { | |
| "infinite" | |
| } | |
| fn as_any(&self) -> &dyn Any { | |
| self | |
| } | |
| fn schema(&self) -> SchemaRef { | |
| self.batch.schema() | |
| } | |
| fn properties(&self) -> &PlanProperties { | |
| &self.properties | |
| } | |
| fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> { | |
| vec![] | |
| } | |
| fn with_new_children( | |
| self: Arc<Self>, | |
| _children: Vec<Arc<dyn ExecutionPlan>>, | |
| ) -> common::Result<Arc<dyn ExecutionPlan>> { | |
| Ok(self.clone()) | |
| } | |
| fn execute( | |
| &self, | |
| _partition: usize, | |
| _context: Arc<TaskContext>, | |
| ) -> common::Result<SendableRecordBatchStream> { | |
| Ok(Box::pin(InfiniteStream { | |
| batch: self.batch.clone(), | |
| poll_count: 0, | |
| })) | |
| } | |
| } | |
| #[tokio::main] | |
| async fn main() -> Result<(), Box<dyn Error>> { | |
| { | |
| let session_context = create_session_context(None, None).await?; | |
| let schema = Arc::new(Schema::new(Fields::try_from(vec![Field::new( | |
| "value", | |
| DataType::Int64, | |
| false, | |
| )])?)); | |
| let mut column_builder = Int64Array::builder(8192); | |
| for v in 0..8192 { | |
| column_builder.append_value(v as i64); | |
| } | |
| let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(column_builder.finish())])?; | |
| let inf = Arc::new(InfiniteExec::new(&batch)); | |
| let aggr = Arc::new(AggregateExec::try_new( | |
| AggregateMode::Single, | |
| PhysicalGroupBy::new(vec![], vec![], vec![]), | |
| vec![Arc::new( | |
| AggregateExprBuilder::new( | |
| sum::sum_udaf(), | |
| vec![Arc::new( | |
| datafusion::physical_expr::expressions::Column::new_with_schema( | |
| "value", &schema, | |
| )?, | |
| )], | |
| ) | |
| .schema(inf.schema().clone()) | |
| .alias("sum") | |
| .build()?, | |
| )], | |
| vec![None], | |
| inf.clone(), | |
| inf.schema(), | |
| )?); | |
| let mut stream = physical_plan::execute_stream(aggr, session_context.task_ctx())?; | |
| let mut builder = RecordBatchReceiverStreamBuilder::new(stream.schema(), 10); | |
| let tx = builder.tx(); | |
| builder.spawn(async move { | |
| while let Some(item) = stream.next().await { | |
| let _ = tx.send(item).await; | |
| } | |
| Ok(()) | |
| }); | |
| let mut stream = builder.build(); | |
| println!("Running query"); | |
| let next = tokio::select! { | |
| res = stream.next() => res, | |
| _ = signal::ctrl_c() => { | |
| println!("ctrl-C"); | |
| None | |
| }, | |
| }; | |
| match next { | |
| None => { | |
| println!("No result"); | |
| } | |
| Some(Ok(batch)) => { | |
| println!("Batch of size {}", batch.num_rows()); | |
| } | |
| Some(Err(_)) => { | |
| println!("Error"); | |
| } | |
| } | |
| println!("Dropping stream"); | |
| drop(stream); | |
| } | |
| Ok(()) | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment