Created
February 1, 2026 19:47
-
-
Save bchewy/e99558b051aa439eac3072da5d686269 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| const { Pool } = require('pg'); | |
| const pool = new Pool({ | |
| connectionString: process.env.DATABASE_URL, | |
| max: 20 // increase connection pool for parallel writes | |
| }); | |
| const OPENAI_API_KEY = process.env.OPENAI_API_KEY; | |
| const EMBEDDING_MODEL = 'text-embedding-3-large'; | |
| const EMBEDDING_DIMENSIONS = 2000; | |
| const BATCH_SIZE = 100; // messages per API call | |
| const CONCURRENCY = 15; // parallel API calls | |
| const CHUNK_SIZE = BATCH_SIZE * CONCURRENCY; // 1500 messages per round | |
| async function generateEmbeddings(texts) { | |
| const response = await fetch('https://api.openai.com/v1/embeddings', { | |
| method: 'POST', | |
| headers: { | |
| 'Content-Type': 'application/json', | |
| 'Authorization': `Bearer ${OPENAI_API_KEY}`, | |
| }, | |
| body: JSON.stringify({ | |
| model: EMBEDDING_MODEL, | |
| input: texts, | |
| dimensions: EMBEDDING_DIMENSIONS, | |
| }), | |
| }); | |
| if (!response.ok) { | |
| const error = await response.text(); | |
| throw new Error(`OpenAI error: ${response.status} - ${error}`); | |
| } | |
| const data = await response.json(); | |
| return data.data.map(item => item.embedding); | |
| } | |
| async function processBatch(messages) { | |
| const texts = messages.map(m => m.text); | |
| const embeddings = await generateEmbeddings(texts); | |
| // Batch insert using a single query with unnest | |
| const messageIds = messages.map(m => m.id); | |
| const vectorStrs = embeddings.map(e => '[' + e.join(',') + ']'); | |
| await pool.query(` | |
| INSERT INTO message_embeddings (message_id, embedding) | |
| SELECT unnest($1::bigint[]), unnest($2::vector[]) | |
| ON CONFLICT (message_id) DO UPDATE | |
| SET embedding = EXCLUDED.embedding, created_at = NOW() | |
| `, [messageIds, vectorStrs]); | |
| return messages.length; | |
| } | |
| async function run() { | |
| const startTime = Date.now(); | |
| // Get total count first | |
| const countResult = await pool.query(` | |
| SELECT COUNT(*) as cnt FROM messages m | |
| LEFT JOIN message_embeddings me ON m.id = me.message_id | |
| WHERE me.message_id IS NULL | |
| AND m.message_text IS NOT NULL | |
| AND m.message_text != '' | |
| AND LENGTH(m.message_text) > 10 | |
| `); | |
| const totalPending = parseInt(countResult.rows[0].cnt); | |
| console.log(`Total messages to embed: ${totalPending}`); | |
| let totalEmbedded = 0; | |
| let round = 0; | |
| while (true) { | |
| round++; | |
| // Fetch a chunk of messages | |
| const result = await pool.query(` | |
| SELECT m.id, m.message_text as text | |
| FROM messages m | |
| LEFT JOIN message_embeddings me ON m.id = me.message_id | |
| WHERE me.message_id IS NULL | |
| AND m.message_text IS NOT NULL | |
| AND m.message_text != '' | |
| AND LENGTH(m.message_text) > 10 | |
| ORDER BY m.id DESC | |
| LIMIT $1 | |
| `, [CHUNK_SIZE]); | |
| if (result.rows.length === 0) { | |
| console.log('All done!'); | |
| break; | |
| } | |
| // Split into batches for parallel processing | |
| const batches = []; | |
| for (let i = 0; i < result.rows.length; i += BATCH_SIZE) { | |
| batches.push(result.rows.slice(i, i + BATCH_SIZE)); | |
| } | |
| const roundStart = Date.now(); | |
| // Process batches in parallel | |
| const results = await Promise.all(batches.map(batch => processBatch(batch))); | |
| const roundEmbedded = results.reduce((a, b) => a + b, 0); | |
| totalEmbedded += roundEmbedded; | |
| const elapsed = ((Date.now() - startTime) / 1000).toFixed(1); | |
| const roundTime = ((Date.now() - roundStart) / 1000).toFixed(1); | |
| const remaining = totalPending - totalEmbedded; | |
| const rate = (totalEmbedded / ((Date.now() - startTime) / 1000)).toFixed(0); | |
| const eta = remaining > 0 ? (remaining / rate).toFixed(0) : 0; | |
| console.log(`Round ${round}: +${roundEmbedded} (${roundTime}s) | Total: ${totalEmbedded}/${totalPending} | Rate: ${rate}/s | ETA: ${eta}s`); | |
| } | |
| const totalTime = ((Date.now() - startTime) / 1000).toFixed(1); | |
| console.log(`\nCompleted ${totalEmbedded} embeddings in ${totalTime}s`); | |
| // Final count | |
| const finalCount = await pool.query('SELECT COUNT(*) FROM message_embeddings'); | |
| console.log(`Total embeddings in DB: ${finalCount.rows[0].count}`); | |
| await pool.end(); | |
| } | |
| run().catch(e => { console.error(e); process.exit(1); }); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment