Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save rustyconover/6ff8cbd93369735287d80ae60436379e to your computer and use it in GitHub Desktop.

Select an option

Save rustyconover/6ff8cbd93369735287d80ae60436379e to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
Demonstration of Arrow IPC serialization size and why zero-row record batches
are not optimal for delivering arbitrary binary data in an IPC stream.
"""
import io
import pyarrow as pa
import pyarrow.ipc as ipc
def get_batch_message_size(schema: pa.Schema) -> int:
"""Get the record batch message size for a zero-row batch with the given schema."""
arrays = [pa.array([], type=field.type) for field in schema]
batch = pa.RecordBatch.from_arrays(arrays, schema=schema)
sink = io.BytesIO()
with ipc.new_stream(sink, batch.schema) as writer:
writer.write_batch(batch)
# Parse stream to find record batch message size
source = pa.BufferReader(sink.getvalue())
ipc.read_message(source) # Skip schema message
schema_end = source.tell()
try:
ipc.read_message(source) # Read batch message
except EOFError:
pass
batch_end = source.tell()
return batch_end - schema_end
# Define test schemas
schemas = {
"single_int32": pa.schema([pa.field("f1", pa.int32())]),
"single_int64": pa.schema([pa.field("f1", pa.int64())]),
"single_string": pa.schema([pa.field("f1", pa.string())]),
"single_binary": pa.schema([pa.field("f1", pa.binary())]),
"multiple_int32": pa.schema([
pa.field("f1", pa.int32()),
pa.field("f2", pa.int32()),
pa.field("f3", pa.int32()),
]),
"multiple_mixed": pa.schema([
pa.field("i1", pa.int32()),
pa.field("s1", pa.string()),
pa.field("i2", pa.int64()),
]),
"simple_struct": pa.schema([
pa.field("s", pa.struct([
pa.field("a", pa.int32()),
pa.field("b", pa.int32()),
])),
]),
"nested_struct": pa.schema([
pa.field("outer", pa.struct([
pa.field("inner", pa.struct([
pa.field("value", pa.int32()),
])),
pa.field("name", pa.string()),
])),
]),
"complex_struct": pa.schema([
pa.field("id", pa.int64()),
pa.field("data", pa.struct([
pa.field("x", pa.float64()),
pa.field("y", pa.float64()),
pa.field("label", pa.string()),
])),
pa.field("tags", pa.list_(pa.string())),
]),
}
print("=" * 60)
print(" Zero-Row Record Batch Overhead")
print("=" * 60)
print("\nSchema definitions:\n")
for name, schema in schemas.items():
print(f"{name}:")
for line in str(schema).splitlines():
print(f" {line}")
print()
print("Zero-row record batch message overhead by schema:")
print(f"{'Schema':<20} {'Batch Msg (bytes)':<20}")
print("-" * 40)
for name, schema in schemas.items():
size = get_batch_message_size(schema)
print(f"{name:<20} {size:<20}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment