Skip to content

Instantly share code, notes, and snippets.

@aliou
Created February 5, 2026 01:47
Show Gist options
  • Select an option

  • Save aliou/e7388413385b6759e75823643749afc9 to your computer and use it in GitHub Desktop.

Select an option

Save aliou/e7388413385b6759e75823643749afc9 to your computer and use it in GitHub Desktop.
Repro/test for streaming image drop bug (pi-mono PR #1271)

Image delivery repro tests

Two test artifacts for verifying that images are not dropped when steer / follow_up are used during streaming.

RPC test (rpc-test.mjs)

Spawns pi in RPC mode and runs three tests:

  1. prompt with image (non-streaming baseline) -- should always pass
  2. steer with image during streaming -- fails before fix
  3. follow_up with image during streaming -- fails before fix

For each test it inspects message_start events for user messages and checks whether the content array includes image blocks.

# Test installed pi (before fix)
node rpc-test.mjs pi

# Test local build (after fix)
node rpc-test.mjs node ./packages/coding-agent/dist/cli.js

Extension test (test-image-ext.ts)

An extension that registers /test-image <path>. When invoked it reads the image, base64-encodes it, and sends it via pi.sendUserMessage() with deliverAs: "steer".

It also hooks agent_end to log every user message's content types to /tmp/test-image-ext.log.

Setup

cp test-image-ext.ts ~/.pi/agent/extensions/

Usage

  1. Start pi interactively
  2. Send a prompt that causes streaming (e.g. echo hello && sleep 5)
  3. While the agent is streaming/executing, run: /test-image /path/to/some/image.png
  4. Wait for agent to finish
  5. Check: cat /tmp/test-image-ext.log

If images are delivered correctly, you'll see:

[user] content types: text, image

If images are dropped (the bug), you'll see only:

[user] content types: text

Cleanup

rm ~/.pi/agent/extensions/test-image-ext.ts
#!/usr/bin/env node
/**
* RPC test script for verifying image delivery through steer/follow_up.
*
* Usage:
* node rpc-test.mjs pi # test installed pi
* node rpc-test.mjs node /path/to/dist/cli.js # test local build
*
* Runs three tests:
* 1. prompt with image (non-streaming baseline) -- should always work
* 2. steer with image (during streaming) -- broken before fix
* 3. follow_up with image (during streaming) -- broken before fix
*
* Tests 2 and 3 use a bash tool call (sleep) to create a reliable window
* where the agent is streaming, then inject steer/follow_up with an image.
*/
import { spawn } from "node:child_process";
import { createInterface } from "node:readline";
// Minimal 1x1 white PNG, base64-encoded
const TEST_IMAGE_B64 =
"iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mP8/5+hHgAHggJ/PchI7wAAAABJRU5ErkJggg==";
const IMAGE_CONTENT = {
type: "image",
data: TEST_IMAGE_B64,
mimeType: "image/png",
};
// ---------------------------------------------------------------------------
// helpers
// ---------------------------------------------------------------------------
function log(msg) {
const ts = new Date().toISOString().slice(11, 23);
console.log(`[${ts}] ${msg}`);
}
function logResult(testName, hasImages) {
const status = hasImages ? "PASS (images present)" : "FAIL (images missing)";
console.log(`\n >> ${testName}: ${status}\n`);
}
// ---------------------------------------------------------------------------
// spawn pi in RPC mode
// ---------------------------------------------------------------------------
function spawnPi(argv) {
const [cmd, ...rest] = argv;
const args = [...rest, "--mode", "rpc", "--no-extensions"];
log(`Spawning: ${cmd} ${args.join(" ")}`);
const proc = spawn(cmd, args, {
stdio: ["pipe", "pipe", "pipe"],
env: { ...process.env },
});
proc.stderr.on("data", (d) => {
process.stderr.write(d);
});
const rl = createInterface({ input: proc.stdout, terminal: false });
const listeners = [];
rl.on("line", (line) => {
try {
const obj = JSON.parse(line);
for (const fn of listeners) fn(obj);
} catch {
// ignore non-JSON
}
});
function onEvent(fn) {
listeners.push(fn);
return () => {
const i = listeners.indexOf(fn);
if (i !== -1) listeners.splice(i, 1);
};
}
let reqId = 0;
function send(command) {
const id = `r_${++reqId}`;
const full = { ...command, id };
return new Promise((resolve) => {
const unsub = onEvent((obj) => {
if (obj.type === "response" && obj.id === id) {
unsub();
resolve(obj);
}
});
proc.stdin.write(JSON.stringify(full) + "\n");
});
}
// Collect events until agent_end
function collectUntilIdle(timeoutMs = 120_000) {
return new Promise((resolve, reject) => {
const events = [];
const timer = setTimeout(() => {
unsub();
reject(new Error(`Timeout waiting for agent_end (got ${events.length} events)`));
}, timeoutMs);
const unsub = onEvent((obj) => {
if (obj.type === "response") return;
events.push(obj);
// Log event types live
if (obj.type === "message_update") {
const delta = obj.assistantMessageEvent;
if (delta?.type === "text_delta") process.stdout.write(delta.delta);
} else if (obj.type === "message_end" && obj.message?.role === "assistant") {
process.stdout.write("\n");
const msg = obj.message;
const content = msg.content || [];
const types = content.map((c) => c.type);
log(` event: message_end [assistant] content_types=[${types}] stop=${msg.stopReason}${msg.errorMessage ? ` error="${msg.errorMessage}"` : ""}`);
} else {
log(` event: ${obj.type}`);
}
if (obj.type === "agent_end") {
process.stdout.write("\n");
clearTimeout(timer);
unsub();
resolve(events);
}
});
});
}
// Wait for a specific event type
function waitForEvent(type, timeoutMs = 60_000) {
return new Promise((resolve, reject) => {
const timer = setTimeout(() => {
unsub();
reject(new Error(`Timeout waiting for event: ${type}`));
}, timeoutMs);
const unsub = onEvent((obj) => {
if (obj.type === type) {
clearTimeout(timer);
unsub();
resolve(obj);
}
});
});
}
function kill() {
proc.kill("SIGTERM");
}
return { send, onEvent, collectUntilIdle, waitForEvent, kill };
}
// ---------------------------------------------------------------------------
// analysis
// ---------------------------------------------------------------------------
function findUserMessagesWithImages(events) {
const results = [];
for (const ev of events) {
if (ev.type !== "message_start") continue;
const msg = ev.message;
if (msg?.role !== "user") continue;
const content = msg.content;
if (!Array.isArray(content)) continue;
const imageBlocks = content.filter((c) => c.type === "image");
results.push({
textBlocks: content.filter((c) => c.type === "text").length,
imageBlocks: imageBlocks.length,
contentTypes: content.map((c) => c.type),
});
}
return results;
}
function printUserMessages(userMsgs) {
log("User messages in events:");
if (userMsgs.length === 0) {
log(" (none found)");
}
for (const m of userMsgs) {
log(` text=${m.textBlocks}, images=${m.imageBlocks}, types=[${m.contentTypes}]`);
}
}
// ---------------------------------------------------------------------------
// tests
// ---------------------------------------------------------------------------
async function runTests(argv) {
const pi = spawnPi(argv);
// Wait for startup
await new Promise((r) => setTimeout(r, 2000));
// Print session info
const stateResp = await pi.send({ type: "get_state" });
if (stateResp.success) {
log(`Session file: ${stateResp.data.sessionFile}`);
log(`Session ID: ${stateResp.data.sessionId}`);
log(`Model: ${stateResp.data.model?.provider}/${stateResp.data.model?.id}`);
}
// ==== TEST 1 ====
console.log("\n========================================");
console.log(" TEST 1: prompt with image (baseline)");
console.log("========================================\n");
{
const eventsPromise = pi.collectUntilIdle();
log("Sending prompt with image...");
const resp = await pi.send({
type: "prompt",
message: "Describe the image in one short sentence. Do not call any tools.",
images: [IMAGE_CONTENT],
});
log(`Response: success=${resp.success}${resp.error ? ` error=${resp.error}` : ""}`);
const events = await eventsPromise;
const userMsgs = findUserMessagesWithImages(events);
printUserMessages(userMsgs);
logResult("prompt with image (non-streaming)", userMsgs.some((m) => m.imageBlocks > 0));
}
// ==== TEST 2 ====
console.log("\n========================================");
console.log(" TEST 2: steer with image (streaming)");
console.log("========================================\n");
{
// Start collecting before sending anything
const eventsPromise = pi.collectUntilIdle();
// Also watch for tool_execution_start so we know when to steer
const toolStartPromise = pi.waitForEvent("tool_execution_start");
log("Sending prompt that triggers tool use (sleep 3)...");
const resp = await pi.send({
type: "prompt",
message: 'Run this bash command: echo "tool running" && sleep 3 && echo "done"',
});
log(`Response: success=${resp.success}${resp.error ? ` error=${resp.error}` : ""}`);
// Wait for tool execution to start -- agent is now definitely streaming
log("Waiting for tool_execution_start...");
await toolStartPromise;
log("Tool executing. Sending steer with image...");
const steerResp = await pi.send({
type: "steer",
message: "Stop. Describe this image in one sentence instead. Do not call any tools.",
images: [IMAGE_CONTENT],
});
log(`Steer response: success=${steerResp.success}${steerResp.error ? ` error=${steerResp.error}` : ""}`);
const events = await eventsPromise;
const userMsgs = findUserMessagesWithImages(events);
printUserMessages(userMsgs);
// The steered message (2nd user message) should have an image
const steerMsg = userMsgs.length >= 2 ? userMsgs[1] : userMsgs[0];
const hasImages = steerMsg?.imageBlocks > 0;
logResult("steer with image (during streaming)", hasImages);
}
// ==== TEST 3 ====
console.log("\n========================================");
console.log(" TEST 3: follow_up with image (streaming)");
console.log("========================================\n");
{
// Collect first agent cycle
const events1Promise = pi.collectUntilIdle();
// Watch for tool execution start
const toolStartPromise = pi.waitForEvent("tool_execution_start");
log("Sending prompt that triggers tool use (sleep 2)...");
const resp = await pi.send({
type: "prompt",
message: 'Run this bash command: echo "running" && sleep 2 && echo "done"',
});
log(`Response: success=${resp.success}${resp.error ? ` error=${resp.error}` : ""}`);
log("Waiting for tool_execution_start...");
await toolStartPromise;
log("Tool executing. Sending follow_up with image...");
const fuResp = await pi.send({
type: "follow_up",
message: "Now describe this image in one sentence. Do not call any tools.",
images: [IMAGE_CONTENT],
});
log(`Follow-up response: success=${fuResp.success}${fuResp.error ? ` error=${fuResp.error}` : ""}`);
const events1 = await events1Promise;
// Follow-up may be processed in the same agent cycle (if delivered
// while agent was still running) or trigger a second cycle.
// Check if we already got the follow-up user message.
let allEvents = events1;
const userMsgsSoFar = findUserMessagesWithImages(events1);
if (userMsgsSoFar.length < 2) {
log("Follow-up not yet delivered. Waiting for second agent cycle...");
const events2 = await pi.collectUntilIdle();
allEvents = [...events1, ...events2];
} else {
log("Follow-up delivered in same agent cycle.");
}
const userMsgs = findUserMessagesWithImages(allEvents);
printUserMessages(userMsgs);
// The follow-up message (2nd user message) should have an image
const fuMsg = userMsgs.length >= 2 ? userMsgs[1] : undefined;
const hasImages = fuMsg?.imageBlocks > 0;
logResult("follow_up with image (during streaming)", hasImages);
}
console.log("\n========================================");
console.log(" DONE");
console.log("========================================\n");
pi.kill();
process.exit(0);
}
// ---------------------------------------------------------------------------
// main
// ---------------------------------------------------------------------------
const argv = process.argv.slice(2);
if (argv.length === 0) {
console.error("Usage: node rpc-test.mjs <pi-command> [args...]");
console.error(" e.g. node rpc-test.mjs pi");
console.error(" e.g. node rpc-test.mjs node ./packages/coding-agent/dist/cli.js");
process.exit(1);
}
runTests(argv).catch((err) => {
console.error("Fatal:", err);
process.exit(1);
});
/**
* Extension for testing image delivery through sendUserMessage during streaming.
*
* Install:
* cp test-image-ext.ts ~/.pi/agent/extensions/
*
* Usage (while the agent is streaming):
* /test-image /path/to/image.png
*
* What it does:
* 1. Reads the image at the given path, base64-encodes it
* 2. Calls pi.sendUserMessage() with text + image, using deliverAs: "steer"
* 3. Logs all user messages (via agent_end) to a file, recording whether
* each message's content array includes image blocks
*
* Check results:
* cat /tmp/test-image-ext.log
*
* The log shows each user message's content types. If images are delivered
* correctly, you'll see entries like:
* [user] content types: text, image
* If images are dropped:
* [user] content types: text
*/
import type { ExtensionAPI } from "@anthropic/pi-extension";
import { readFileSync, appendFileSync, writeFileSync } from "node:fs";
import { extname } from "node:path";
const LOG_FILE = "/tmp/test-image-ext.log";
const MIME_MAP: Record<string, string> = {
".png": "image/png",
".jpg": "image/jpeg",
".jpeg": "image/jpeg",
".gif": "image/gif",
".webp": "image/webp",
};
function logToFile(msg: string) {
const ts = new Date().toISOString().slice(11, 23);
appendFileSync(LOG_FILE, `[${ts}] ${msg}\n`);
}
export default function (pi: ExtensionAPI) {
// Reset log on session start
pi.on("session_start", () => {
writeFileSync(LOG_FILE, `--- session started at ${new Date().toISOString()} ---\n`);
});
// Log all messages at agent_end so we can see what was delivered
pi.on("agent_end", (event) => {
logToFile(`agent_end: ${event.messages.length} messages`);
for (const msg of event.messages) {
if (msg.role === "user") {
const content = msg.content;
if (Array.isArray(content)) {
const types = content.map((c: { type: string }) => c.type);
logToFile(` [user] content types: ${types.join(", ")}`);
} else {
logToFile(` [user] content is string (no image possible)`);
}
} else {
logToFile(` [${msg.role}] (skipped)`);
}
}
});
pi.registerCommand("test-image", {
description: "Send a user message with an image via steer. Usage: /test-image <path>",
handler: async (args, ctx) => {
const imagePath = args.trim();
if (!imagePath) {
ctx.ui.notify("Usage: /test-image /path/to/image.png", "error");
return;
}
const ext = extname(imagePath).toLowerCase();
const mediaType = MIME_MAP[ext];
if (!mediaType) {
ctx.ui.notify(`Unsupported image type: ${ext}`, "error");
return;
}
let data: string;
try {
data = readFileSync(imagePath).toString("base64");
} catch (err) {
ctx.ui.notify(`Failed to read image: ${err}`, "error");
return;
}
logToFile(`/test-image: sending image ${imagePath} (${mediaType}, ${data.length} base64 chars)`);
logToFile(`/test-image: isIdle=${ctx.isIdle()}`);
pi.sendUserMessage(
[
{ type: "text", text: "Stop. Describe this image in one sentence. Do not call any tools." },
{
type: "image",
data,
mimeType: mediaType,
},
],
{ deliverAs: "steer" },
);
logToFile("/test-image: sendUserMessage called with deliverAs=steer");
ctx.ui.notify("Image steered. Check /tmp/test-image-ext.log after agent finishes.");
},
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment