Skip to content

Instantly share code, notes, and snippets.

@slinkydeveloper
Last active February 12, 2026 19:23
Show Gist options
  • Select an option

  • Save slinkydeveloper/d929a67027a7192e4829bc7ece7cd64c to your computer and use it in GitHub Desktop.

Select an option

Save slinkydeveloper/d929a67027a7192e4829bc7ece7cd64c to your computer and use it in GitHub Desktop.
import * as restate from "@restatedev/restate-sdk";
import {RestatePromise, TerminalError} from "@restatedev/restate-sdk";
import {readFile} from "fs/promises";
import {join} from "path";
import type * as dosswf from "./workflow_dsl_types"
import * as dosswfruntime from "./workflow_interpreter_runtime"
type ActionStackElement = { id: number, stepIds: string[] };
type ExecuteRequest = {
steps: dosswf.ActionStep[];
actionStack: ActionStackElement[];
thisStackId: number;
// Filled only if this is a subworkflow within MAP!
iterationIndex?: number;
};
const TRIGGER_KEY = "trigger";
const workflowRunner = restate.service({
name: "WorkflowRunner",
handlers: {
start: async (ctx: restate.Context, {workflowDefinition, trigger}: {
workflowDefinition: string,
trigger: any
}) => {
const workflow = await ctx.run(`load ${workflowDefinition} workflow`, () => loadWorkflowFromFile(workflowDefinition));
const workflowKey = ctx.rand.uuidv4();
// Init the WorkflowResults tree
await ctx.objectClient(workflowResultsTree, workflowKey).init(trigger);
// Generate root workflow key and start
const workflowPromise = ctx.objectClient(workflowInterpreter, workflowKey).interpret({
steps: workflow.steps,
actionStack: [],
thisStackId: 0,
});
const invocationId = await workflowPromise.invocationId;
try {
// Await completion of workflow
await workflowPromise;
} finally {
// Schedule cleanup in 24hrs from when workflow promise completes
ctx.objectSendClient(workflowResultsTree, workflowKey).cleanup(restate.rpc.sendOpts({delay: {hours: 24}}));
}
return {
workflowKey,
invocationId
}
}
}
})
async function getAndFilter(ctx: restate.ObjectContext, key: string, innerExpression?: string) {
let value = await ctx.get(key);
if (!value) {
throw new TerminalError(`Key ${key} not found`, {errorCode: 404})
}
if (innerExpression) {
value = dosswfruntime.expressionResult(value, innerExpression)
}
return value
}
function stepResultStateKey(stack: number[], stepId: string) {
return `${["stepResults"].concat(stack.map(n => n.toString())).join("/")}/${stepId}`;
}
function dataStoreStateKey(dataStoreKey: string) {
return `datastore/${dataStoreKey}`;
}
const workflowResultsTree = restate.object({
name: "WorkflowResultsTree",
handlers: {
// --- Readers ---
getTrigger: (ctx: restate.ObjectContext, {innerExpression}: { innerExpression?: string }) =>
getAndFilter(ctx, TRIGGER_KEY, innerExpression),
getStepResult: async (ctx: restate.ObjectContext, {stack, stepId, innerExpression}: {
stack: number[],
stepId: string,
innerExpression?: string
}) =>
getAndFilter(ctx, stepResultStateKey(stack, stepId), innerExpression),
getDataStoreValue: (ctx: restate.ObjectContext, {dataStoreKey, innerExpression}: {
dataStoreKey: string,
innerExpression?: string
}) =>
getAndFilter(ctx, dataStoreStateKey(dataStoreKey), innerExpression),
// --- Writers ---
init: async (ctx: restate.ObjectContext, {trigger}: { trigger: any }) => {
ctx.set(TRIGGER_KEY, trigger);
},
setStepResult: async (ctx: restate.ObjectContext, {stack, stepId, value}: {
stack: number[],
stepId: string,
value: any
}) => {
ctx.set(stepResultStateKey(stack, stepId), value);
},
setDataStoreValue: async (ctx: restate.ObjectContext, {dataStoreKey, value}: {
dataStoreKey: string,
value: any
}) => {
// TODO ask @dbt merging rules for data store values
ctx.set(dataStoreStateKey(dataStoreKey), value);
},
cleanup: async (ctx: restate.ObjectContext) => {
ctx.clearAll();
},
},
options: {
// An optimization to avoid loading all state entries (in all the handlers we usually just need to read/write one)
enableLazyState: true,
// This service should just be called by WorkflowRunner
ingressPrivate: true
}
})
const workflowInterpreter = restate.object({
name: "WorkflowInterpreter",
handlers: {
interpret: restate.createObjectSharedHandler(async (ctx: restate.ObjectSharedContext, req: ExecuteRequest) => {
// Step results
const stepResults: Record<string, dosswf.StepResult> = {};
const currentStack = req.actionStack.map(s => s.id).concat([req.thisStackId]);
// Will be incremented to build the stack
let childrenCount = 0;
// Factory for the step invocation context
const stepExecutionContext = <T>(stepDefinition: T): dosswfruntime.StepExecutionContext<T> => ({
stepDefinition,
restate: ctx,
mapIterationIndex: req.iterationIndex,
getTrigger:
async (innerExpression) => ctx.objectClient(workflowResultsTree, ctx.key).getTrigger({innerExpression}),
getStepResult: async (stepId, innerExpression) => {
// Let's first check if we have this locally
let value = stepResults[stepId];
if (value !== undefined) {
// Found it locally, let's apply the expression and we're good to go
if (innerExpression !== undefined && value.type === "success") {
// Apply filter
value.result = dosswfruntime.expressionResult(value.result, innerExpression);
}
} else {
// Find this in the stack
const stack = [];
let found = false;
for (const stackElement of req.actionStack) {
stack.push(stackElement.id);
if (stackElement.stepIds.find(stackStepId => stackStepId === stepId)) {
// Found it
found = true;
break;
}
}
if (!found) {
throw new TerminalError(`Step ${stepId} not found in action stack, looks like a bug in the workflow definition?`);
}
// Ok we've found who in the stack has this step result, let's query it
value = ctx.objectClient(workflowResultsTree, ctx.key).getStepResult({stack, stepId, innerExpression}) as unknown as dosswf.StepResult;
}
return value;
},
getDataStoreValue: async (dataStoreKey: string, innerExpression?: string) => ctx.objectClient(workflowResultsTree, ctx.key).getDataStoreValue({dataStoreKey, innerExpression}),
})
for (const step of req.steps) {
// Prepare the stack element to propagate, and the child key
const getActionStack = () => {
const stackElement: ActionStackElement = {
id: req.thisStackId,
stepIds: Object.keys(stepResults)
};
return req.actionStack.concat([stackElement]);
}
switch (step.type) {
case "query": {
stepResults[step.id] = await dosswfruntime.tableQuery(stepExecutionContext(step));
ctx.objectSendClient(workflowResultsTree, ctx.key).setStepResult({stack: currentStack, stepId: step.id, value: stepResults[step.id]});
break;
}
case "select": {
// Evaluate condition and decide which branch to go to
const conditionEvaluationResult = await dosswfruntime.evaluateConditionExpression(stepExecutionContext(step));
const branch = conditionEvaluationResult ? step.trueBranch : step.falseBranch;
const newStackElementId = childrenCount;
childrenCount++;
await ctx.objectClient(workflowInterpreter, ctx.key).interpret({
steps: branch,
actionStack:getActionStack(),
thisStackId: newStackElementId
})
break;
}
case "map": {
const numberOfIterations = await dosswfruntime.evaluateInputExpressionArraySize(stepExecutionContext(step));
const actionStack = getActionStack();
// Create various child invocations
const childInterpreterPromises: RestatePromise<void>[] = [];
for (let iterationIndex = 0; iterationIndex < numberOfIterations; iterationIndex++) {
const newStackElementId = childrenCount;
childrenCount++;
childInterpreterPromises.push(
ctx.objectClient(workflowInterpreter, ctx.key).interpret({
steps: step.sub,
actionStack,
thisStackId: newStackElementId,
iterationIndex
})
);
}
// Now await all child invocations to complete
await RestatePromise.all(childInterpreterPromises);
break;
}
}
}
}),
},
options: {
// This service should just get called by WorkflowRunner
ingressPrivate: true
}
});
async function loadWorkflowFromFile(id: string): Promise<dosswf.ActionDefinition> {
const filePath = join(process.cwd(), 'workflow', `${id}.json`);
const fileContent = await readFile(filePath, 'utf-8');
return JSON.parse(fileContent) as dosswf.ActionDefinition;
}
restate.serve({
services: [workflowRunner, workflowInterpreter, workflowResultsTree],
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment