Created
December 23, 2025 13:42
-
-
Save sim51/073737e5912d1383367305ff8d226634 to your computer and use it in GitHub Desktop.
LlamaIndex workflow for elasticsearch query
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import { Client } from '@elastic/elasticsearch'; | |
| import { isNil } from 'lodash'; | |
| import config from '../config'; | |
| export const client = new Client({ | |
| node: config.elastic.host, | |
| }); | |
| /** | |
| * Get index mappings | |
| */ | |
| export async function getIndexMappings(index: string) { | |
| const mapping = await client.indices.getMapping({ index }); | |
| return mapping[index].mappings.properties; | |
| } | |
| /** | |
| * Query es index | |
| */ | |
| export async function queryIndex(index: string, query: string) { | |
| try { | |
| const parsedQuery = JSON.parse(query); | |
| if (!('query' in parsedQuery)) throw new Error("Error: Query JSON must contain a 'query' key"); | |
| const response = await client.search({ index, body: parsedQuery }); | |
| const hits = response['hits']['hits']; | |
| if (isNil(hits) || hits.length == 0) throw new Error('Query has no results'); | |
| return hits; | |
| } catch (e) { | |
| if (e instanceof SyntaxError) throw new Error('Error: Query JSON no valid format'); | |
| throw e; | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // Check https://www.elastic.co/search-labs/blog/llamaindex-workflows-with-elasticsearch | |
| import { Ollama } from '@llamaindex/ollama'; | |
| import { createStatefulMiddleware, createWorkflow, workflowEvent } from '@llamaindex/workflow'; | |
| import { Settings } from 'llamaindex'; | |
| import config from '../../../config'; | |
| import { getIndexMappings, queryIndex } from '../../elastic'; | |
| Settings.llm = new Ollama({ | |
| model: config.ollama.model, | |
| config: { | |
| host: config.ollama.url, | |
| }, | |
| }); | |
| // Define our workflow events | |
| const startEvent = workflowEvent<{ | |
| index: string; | |
| query: string; | |
| }>(); | |
| const validationErrorEvent = workflowEvent<{ | |
| error: string; | |
| wrong_output: string; | |
| index: string; | |
| query: string; | |
| }>(); | |
| const extractionDone = workflowEvent<{ output: string; index: string; query: string }>(); | |
| const stopEvent = workflowEvent<{ result: Array<any>; error?: string }>(); | |
| // Create our workflow | |
| const { withState } = createStatefulMiddleware(() => ({ | |
| numIterations: 0, | |
| maxIterations: 3, | |
| })); | |
| const searchTextUnits = withState(createWorkflow()); | |
| // Start or validation error | |
| searchTextUnits.handle([startEvent], async (_context, event) => { | |
| console.log('=== BUILD QUERY ==='); | |
| const { index, query, ...data } = event.data; | |
| let reflection_prompt = ''; | |
| const schema = await getIndexMappings(index); | |
| // Check if event is validationError | |
| if ('wrong_output' in data && 'error' in data) { | |
| reflection_prompt = ` | |
| You already created this output previously: | |
| --------------------- | |
| ${data.wrong_output} | |
| --------------------- | |
| This caused the error: ${data.error} | |
| `; | |
| } | |
| const prompt = ` | |
| Context information is below: | |
| --------------------- | |
| ${query} | |
| --------------------- | |
| This is the index mapping | |
| --------------------- | |
| ${JSON.stringify(schema, null, 2)} | |
| --------------------- | |
| Analyze it to know wich field is defined with its type | |
| Given the context information and not prior knowledge, create a Elasticsearch query from the information in the context. | |
| The query must return the documents that match with query and the context information and the query used for retrieve the results. | |
| ${reflection_prompt} | |
| The response must contain only a valid Elasticsearch query. Do not add any sentence before or after the JSON object. | |
| `; | |
| const output = await Settings.llm.complete({ prompt }); | |
| console.log('Prompt is', prompt); | |
| console.log('ES query is', output.text); | |
| return extractionDone.with({ output: output.text, query, index }); | |
| }); | |
| searchTextUnits.handle([validationErrorEvent], async (_context, event) => { | |
| console.log('=== BUILD QUERY RETRY ==='); | |
| const { index, query, ...data } = event.data; | |
| let reflection_prompt = ''; | |
| const schema = await getIndexMappings(index); | |
| // Check if event is validationError | |
| if ('wrong_output' in data && 'error' in data) { | |
| reflection_prompt = ` | |
| You already created this output previously: | |
| --------------------- | |
| ${data.wrong_output} | |
| --------------------- | |
| This caused the error: ${data.error} | |
| `; | |
| } | |
| const prompt = ` | |
| Context information is below: | |
| --------------------- | |
| ${query} | |
| --------------------- | |
| This is the index mapping | |
| --------------------- | |
| ${JSON.stringify(schema, null, 2)} | |
| --------------------- | |
| Analyze it to know wich field is defined with its type | |
| Given the context information and not prior knowledge, create a Elasticsearch query from the information in the context. | |
| The query must return the documents that match with query and the context information and the query used for retrieve the results. | |
| ${reflection_prompt} | |
| The response must contain only a valid Elasticsearch query. Do not add any sentence before or after the JSON object. | |
| `; | |
| const output = await Settings.llm.complete({ prompt }); | |
| console.log('Prompt is', prompt); | |
| console.log('ES query is', output.text); | |
| return extractionDone.with({ output: output.text, query, index }); | |
| }); | |
| searchTextUnits.handle([extractionDone], async (context, event) => { | |
| console.log('=== RESULT STEP ==='); | |
| const { index, query, output } = event.data; | |
| try { | |
| const result = await queryIndex(index, output); | |
| return stopEvent.with({ result }); | |
| } catch (e) { | |
| return validationErrorEvent.with({ | |
| index, | |
| query, | |
| error: (e as Error).message, | |
| wrong_output: output, | |
| }); | |
| } finally { | |
| context.state.numIterations += 1; | |
| if (context.state.numIterations > context.state.maxIterations) { | |
| return stopEvent.with({ result: [], error: 'Max attempt' }); | |
| } | |
| } | |
| }); | |
| // Usage | |
| async function main() { | |
| const result = await new Promise(async (resolve, _reject) => { | |
| const { stream, sendEvent } = searchTextUnits.createContext(); | |
| sendEvent( | |
| startEvent.with({ | |
| index: 'tut_rag_textunit_1766493636506', | |
| query: | |
| 'search textunits with sources that must macth the following assertions : * gender "MAN" * under 20 years old * leaving in country Denmark', | |
| }), | |
| ); | |
| console.log('start'); | |
| for await (const event of stream) { | |
| if (stopEvent.include(event)) { | |
| resolve(event.data.result.map((e) => e._source)); | |
| } | |
| } | |
| }); | |
| console.log('result', result); | |
| } | |
| main().catch(console.error); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment