Skip to content

Instantly share code, notes, and snippets.

@bchewy
Created February 1, 2026 19:47
Show Gist options
  • Select an option

  • Save bchewy/e99558b051aa439eac3072da5d686269 to your computer and use it in GitHub Desktop.

Select an option

Save bchewy/e99558b051aa439eac3072da5d686269 to your computer and use it in GitHub Desktop.
const { Pool } = require('pg');
const pool = new Pool({
connectionString: process.env.DATABASE_URL,
max: 20 // increase connection pool for parallel writes
});
const OPENAI_API_KEY = process.env.OPENAI_API_KEY;
const EMBEDDING_MODEL = 'text-embedding-3-large';
const EMBEDDING_DIMENSIONS = 2000;
const BATCH_SIZE = 100; // messages per API call
const CONCURRENCY = 15; // parallel API calls
const CHUNK_SIZE = BATCH_SIZE * CONCURRENCY; // 1500 messages per round
async function generateEmbeddings(texts) {
const response = await fetch('https://api.openai.com/v1/embeddings', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${OPENAI_API_KEY}`,
},
body: JSON.stringify({
model: EMBEDDING_MODEL,
input: texts,
dimensions: EMBEDDING_DIMENSIONS,
}),
});
if (!response.ok) {
const error = await response.text();
throw new Error(`OpenAI error: ${response.status} - ${error}`);
}
const data = await response.json();
return data.data.map(item => item.embedding);
}
async function processBatch(messages) {
const texts = messages.map(m => m.text);
const embeddings = await generateEmbeddings(texts);
// Batch insert using a single query with unnest
const messageIds = messages.map(m => m.id);
const vectorStrs = embeddings.map(e => '[' + e.join(',') + ']');
await pool.query(`
INSERT INTO message_embeddings (message_id, embedding)
SELECT unnest($1::bigint[]), unnest($2::vector[])
ON CONFLICT (message_id) DO UPDATE
SET embedding = EXCLUDED.embedding, created_at = NOW()
`, [messageIds, vectorStrs]);
return messages.length;
}
async function run() {
const startTime = Date.now();
// Get total count first
const countResult = await pool.query(`
SELECT COUNT(*) as cnt FROM messages m
LEFT JOIN message_embeddings me ON m.id = me.message_id
WHERE me.message_id IS NULL
AND m.message_text IS NOT NULL
AND m.message_text != ''
AND LENGTH(m.message_text) > 10
`);
const totalPending = parseInt(countResult.rows[0].cnt);
console.log(`Total messages to embed: ${totalPending}`);
let totalEmbedded = 0;
let round = 0;
while (true) {
round++;
// Fetch a chunk of messages
const result = await pool.query(`
SELECT m.id, m.message_text as text
FROM messages m
LEFT JOIN message_embeddings me ON m.id = me.message_id
WHERE me.message_id IS NULL
AND m.message_text IS NOT NULL
AND m.message_text != ''
AND LENGTH(m.message_text) > 10
ORDER BY m.id DESC
LIMIT $1
`, [CHUNK_SIZE]);
if (result.rows.length === 0) {
console.log('All done!');
break;
}
// Split into batches for parallel processing
const batches = [];
for (let i = 0; i < result.rows.length; i += BATCH_SIZE) {
batches.push(result.rows.slice(i, i + BATCH_SIZE));
}
const roundStart = Date.now();
// Process batches in parallel
const results = await Promise.all(batches.map(batch => processBatch(batch)));
const roundEmbedded = results.reduce((a, b) => a + b, 0);
totalEmbedded += roundEmbedded;
const elapsed = ((Date.now() - startTime) / 1000).toFixed(1);
const roundTime = ((Date.now() - roundStart) / 1000).toFixed(1);
const remaining = totalPending - totalEmbedded;
const rate = (totalEmbedded / ((Date.now() - startTime) / 1000)).toFixed(0);
const eta = remaining > 0 ? (remaining / rate).toFixed(0) : 0;
console.log(`Round ${round}: +${roundEmbedded} (${roundTime}s) | Total: ${totalEmbedded}/${totalPending} | Rate: ${rate}/s | ETA: ${eta}s`);
}
const totalTime = ((Date.now() - startTime) / 1000).toFixed(1);
console.log(`\nCompleted ${totalEmbedded} embeddings in ${totalTime}s`);
// Final count
const finalCount = await pool.query('SELECT COUNT(*) FROM message_embeddings');
console.log(`Total embeddings in DB: ${finalCount.rows[0].count}`);
await pool.end();
}
run().catch(e => { console.error(e); process.exit(1); });
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment