Skip to content

Instantly share code, notes, and snippets.

@sim51
Created December 23, 2025 13:42
Show Gist options
  • Select an option

  • Save sim51/073737e5912d1383367305ff8d226634 to your computer and use it in GitHub Desktop.

Select an option

Save sim51/073737e5912d1383367305ff8d226634 to your computer and use it in GitHub Desktop.
LlamaIndex workflow for elasticsearch query
import { Client } from '@elastic/elasticsearch';
import { isNil } from 'lodash';
import config from '../config';
export const client = new Client({
node: config.elastic.host,
});
/**
* Get index mappings
*/
export async function getIndexMappings(index: string) {
const mapping = await client.indices.getMapping({ index });
return mapping[index].mappings.properties;
}
/**
* Query es index
*/
export async function queryIndex(index: string, query: string) {
try {
const parsedQuery = JSON.parse(query);
if (!('query' in parsedQuery)) throw new Error("Error: Query JSON must contain a 'query' key");
const response = await client.search({ index, body: parsedQuery });
const hits = response['hits']['hits'];
if (isNil(hits) || hits.length == 0) throw new Error('Query has no results');
return hits;
} catch (e) {
if (e instanceof SyntaxError) throw new Error('Error: Query JSON no valid format');
throw e;
}
}
// Check https://www.elastic.co/search-labs/blog/llamaindex-workflows-with-elasticsearch
import { Ollama } from '@llamaindex/ollama';
import { createStatefulMiddleware, createWorkflow, workflowEvent } from '@llamaindex/workflow';
import { Settings } from 'llamaindex';
import config from '../../../config';
import { getIndexMappings, queryIndex } from '../../elastic';
Settings.llm = new Ollama({
model: config.ollama.model,
config: {
host: config.ollama.url,
},
});
// Define our workflow events
const startEvent = workflowEvent<{
index: string;
query: string;
}>();
const validationErrorEvent = workflowEvent<{
error: string;
wrong_output: string;
index: string;
query: string;
}>();
const extractionDone = workflowEvent<{ output: string; index: string; query: string }>();
const stopEvent = workflowEvent<{ result: Array<any>; error?: string }>();
// Create our workflow
const { withState } = createStatefulMiddleware(() => ({
numIterations: 0,
maxIterations: 3,
}));
const searchTextUnits = withState(createWorkflow());
// Start or validation error
searchTextUnits.handle([startEvent], async (_context, event) => {
console.log('=== BUILD QUERY ===');
const { index, query, ...data } = event.data;
let reflection_prompt = '';
const schema = await getIndexMappings(index);
// Check if event is validationError
if ('wrong_output' in data && 'error' in data) {
reflection_prompt = `
You already created this output previously:
---------------------
${data.wrong_output}
---------------------
This caused the error: ${data.error}
`;
}
const prompt = `
Context information is below:
---------------------
${query}
---------------------
This is the index mapping
---------------------
${JSON.stringify(schema, null, 2)}
---------------------
Analyze it to know wich field is defined with its type
Given the context information and not prior knowledge, create a Elasticsearch query from the information in the context.
The query must return the documents that match with query and the context information and the query used for retrieve the results.
${reflection_prompt}
The response must contain only a valid Elasticsearch query. Do not add any sentence before or after the JSON object.
`;
const output = await Settings.llm.complete({ prompt });
console.log('Prompt is', prompt);
console.log('ES query is', output.text);
return extractionDone.with({ output: output.text, query, index });
});
searchTextUnits.handle([validationErrorEvent], async (_context, event) => {
console.log('=== BUILD QUERY RETRY ===');
const { index, query, ...data } = event.data;
let reflection_prompt = '';
const schema = await getIndexMappings(index);
// Check if event is validationError
if ('wrong_output' in data && 'error' in data) {
reflection_prompt = `
You already created this output previously:
---------------------
${data.wrong_output}
---------------------
This caused the error: ${data.error}
`;
}
const prompt = `
Context information is below:
---------------------
${query}
---------------------
This is the index mapping
---------------------
${JSON.stringify(schema, null, 2)}
---------------------
Analyze it to know wich field is defined with its type
Given the context information and not prior knowledge, create a Elasticsearch query from the information in the context.
The query must return the documents that match with query and the context information and the query used for retrieve the results.
${reflection_prompt}
The response must contain only a valid Elasticsearch query. Do not add any sentence before or after the JSON object.
`;
const output = await Settings.llm.complete({ prompt });
console.log('Prompt is', prompt);
console.log('ES query is', output.text);
return extractionDone.with({ output: output.text, query, index });
});
searchTextUnits.handle([extractionDone], async (context, event) => {
console.log('=== RESULT STEP ===');
const { index, query, output } = event.data;
try {
const result = await queryIndex(index, output);
return stopEvent.with({ result });
} catch (e) {
return validationErrorEvent.with({
index,
query,
error: (e as Error).message,
wrong_output: output,
});
} finally {
context.state.numIterations += 1;
if (context.state.numIterations > context.state.maxIterations) {
return stopEvent.with({ result: [], error: 'Max attempt' });
}
}
});
// Usage
async function main() {
const result = await new Promise(async (resolve, _reject) => {
const { stream, sendEvent } = searchTextUnits.createContext();
sendEvent(
startEvent.with({
index: 'tut_rag_textunit_1766493636506',
query:
'search textunits with sources that must macth the following assertions : * gender "MAN" * under 20 years old * leaving in country Denmark',
}),
);
console.log('start');
for await (const event of stream) {
if (stopEvent.include(event)) {
resolve(event.data.result.map((e) => e._source));
}
}
});
console.log('result', result);
}
main().catch(console.error);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment