Skip to content

Instantly share code, notes, and snippets.

@pepijnve
Created May 28, 2025 14:51
Show Gist options
  • Select an option

  • Save pepijnve/c013a697b1869ea067e793bf3e1e115a to your computer and use it in GitHub Desktop.

Select an option

Save pepijnve/c013a697b1869ea067e793bf3e1e115a to your computer and use it in GitHub Desktop.
Infinitely running DataFusion query
use arrow::array::{Int64Array, RecordBatch};
use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
use datafusion::functions_aggregate::sum;
use datafusion::physical_expr::aggregate::AggregateExprBuilder;
use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::physical_plan::stream::RecordBatchReceiverStreamBuilder;
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
use datafusion::{common, physical_plan};
use datafusion_ext::context::create_session_context;
use futures::{Stream, StreamExt};
use std::any::Any;
use std::error::Error;
use std::fmt::Formatter;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Poll::Ready;
use std::task::{Context, Poll};
use tokio::signal;
struct InfiniteStream {
batch: RecordBatch,
poll_count: usize,
}
impl RecordBatchStream for InfiniteStream {
fn schema(&self) -> SchemaRef {
self.batch.schema()
}
}
impl Stream for InfiniteStream {
type Item = common::Result<RecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.poll_count += 1;
if self.poll_count % 10000 == 0 {
println!("InfiniteStream::poll_next {} times", self.poll_count);
}
Ready(Some(Ok(self.batch.clone())))
}
}
#[derive(Debug)]
struct InfiniteExec {
batch: RecordBatch,
properties: PlanProperties,
}
impl InfiniteExec {
fn new(batch: &RecordBatch) -> Self {
InfiniteExec {
batch: batch.clone(),
properties: PlanProperties::new(
EquivalenceProperties::new(batch.schema().clone()),
Partitioning::UnknownPartitioning(1),
EmissionType::Incremental,
Boundedness::Unbounded {
requires_infinite_memory: false,
},
),
}
}
}
impl DisplayAs for InfiniteExec {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
write!(f, "infinite")
}
}
impl ExecutionPlan for InfiniteExec {
fn name(&self) -> &str {
"infinite"
}
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.batch.schema()
}
fn properties(&self) -> &PlanProperties {
&self.properties
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}
fn with_new_children(
self: Arc<Self>,
_children: Vec<Arc<dyn ExecutionPlan>>,
) -> common::Result<Arc<dyn ExecutionPlan>> {
Ok(self.clone())
}
fn execute(
&self,
_partition: usize,
_context: Arc<TaskContext>,
) -> common::Result<SendableRecordBatchStream> {
Ok(Box::pin(InfiniteStream {
batch: self.batch.clone(),
poll_count: 0,
}))
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
{
let session_context = create_session_context(None, None).await?;
let schema = Arc::new(Schema::new(Fields::try_from(vec![Field::new(
"value",
DataType::Int64,
false,
)])?));
let mut column_builder = Int64Array::builder(8192);
for v in 0..8192 {
column_builder.append_value(v as i64);
}
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(column_builder.finish())])?;
let inf = Arc::new(InfiniteExec::new(&batch));
let aggr = Arc::new(AggregateExec::try_new(
AggregateMode::Single,
PhysicalGroupBy::new(vec![], vec![], vec![]),
vec![Arc::new(
AggregateExprBuilder::new(
sum::sum_udaf(),
vec![Arc::new(
datafusion::physical_expr::expressions::Column::new_with_schema(
"value", &schema,
)?,
)],
)
.schema(inf.schema().clone())
.alias("sum")
.build()?,
)],
vec![None],
inf.clone(),
inf.schema(),
)?);
let mut stream = physical_plan::execute_stream(aggr, session_context.task_ctx())?;
let mut builder = RecordBatchReceiverStreamBuilder::new(stream.schema(), 10);
let tx = builder.tx();
builder.spawn(async move {
while let Some(item) = stream.next().await {
let _ = tx.send(item).await;
}
Ok(())
});
let mut stream = builder.build();
println!("Running query");
let next = tokio::select! {
res = stream.next() => res,
_ = signal::ctrl_c() => {
println!("ctrl-C");
None
},
};
match next {
None => {
println!("No result");
}
Some(Ok(batch)) => {
println!("Batch of size {}", batch.num_rows());
}
Some(Err(_)) => {
println!("Error");
}
}
println!("Dropping stream");
drop(stream);
}
Ok(())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment