Integration analysis of nshkrdotcom/agent_session_manager (ASM) onto Jido's existing architecture.
ASM provides session lifecycle management, event logging, transcript reconstruction, provider adapters (Claude/Codex/Amp), policy enforcement, routing/circuit-breaking, and concurrency limiting for LLM agent interactions.
Jido already has equivalents for ~80% of this. The remaining ~20% (provider-specific LLM execution, workspace instrumentation, and a convenience session API) can be added as plugins, custom directives, and thin glue modules — no new runtime or parallel process hierarchy needed.
Core insight: Model an ASM "Session" as a Jido Agent instance. Model ASM "Events" as Jido.Thread entries. Model ASM "ProviderAdapter" as a Jido Plugin with custom directives. Everything else layers on top.
| ASM Concept | Jido Equivalent | Status |
|---|---|---|
Session (lifecycle state machine) |
Agent with status in agent.state |
Exists |
Run (single execution unit) |
Agent.cmd/2 + AgentServer signal processing |
Exists |
Event (append-only log entry) |
Jido.Thread.Entry |
Exists |
Transcript (conversation reconstruction) |
Projection function over Jido.Thread |
New (pure function) |
SessionManager (orchestrator) |
Jido.AgentServer |
Exists |
SessionServer (per-session GenServer) |
Jido.AgentServer (already per-agent) |
Exists |
ProviderAdapter (Claude/Codex/Amp) |
Jido.Plugin + custom DirectiveExec |
New (plugin) |
SessionStore (persistence) |
Jido.Storage + Jido.Persist |
Exists |
DurableStore (boundary flush) |
Jido.Persist.hibernate/2 |
Exists |
Capability |
Jido.Plugin :capabilities field |
Exists |
Manifest / Registry |
Jido.Discovery |
Exists |
Policy (limits, tool rules) |
Plugin via handle_signal/2 |
New (plugin) |
ConcurrencyLimiter |
Plugin + max_queue_size + child process |
New (plugin) |
CapabilityResolver |
Jido.Plugin.Requirements |
Exists |
Error |
Jido.Error (Splode-based) |
Exists |
EventNormalizer |
Thread entry kind conventions |
New (convention) |
RunQueue |
AgentServer.State.queue (:queue) |
Exists |
CircuitBreaker / ProviderRouter |
Plugin with stateful routing | New (plugin) |
Workspace (snapshots, diffs) |
New plugin + storage adapter | New (plugin) |
ArtifactStore |
Extend Jido.Storage or new behaviour |
New |
StreamSession |
Signal-based streaming pattern | New (pattern) |
┌─────────────────────────────────────────────────────────┐
│ AgentServer (GenServer) │
│ ┌───────────────────────────────────────────────────┐ │
│ │ SessionAgent (pure, immutable) │ │
│ │ ┌─────────────────────────────────────────────┐ │ │
│ │ │ state: │ │ │
│ │ │ status: :idle | :running | :completed │ │ │
│ │ │ session: %{model, provider, policy, ...} │ │ │
│ │ │ run: %{id, seq, phase, token_usage, ...} │ │ │
│ │ │ __thread__: Jido.Thread (append-only) │ │ │
│ │ │ __memory__: Jido.Memory (mutable beliefs)│ │ │
│ │ └─────────────────────────────────────────────┘ │ │
│ └───────────────────────────────────────────────────┘ │
│ │
│ Plugins: │
│ Thread.Plugin ── owns :__thread__ │
│ ProviderPlugin ── LLM execution, capabilities │
│ PolicyPlugin ── limits, tool rules │
│ RouterPlugin ── circuit breaker, failover │
│ │
│ Directives → queue → drain loop → DirectiveExec │
│ %Emit{}, %Schedule{}, %Stop{}, %LLMRequest{}, ... │
└─────────────────────────────────────────────────────────┘
A "session agent" is a normal Jido Agent with standardized state keys. No new struct needed — just conventions.
defmodule MyApp.SessionAgent do
use Jido.Agent,
name: "session_agent",
description: "LLM interaction session",
schema: Zoi.object(%{
status: Zoi.atom() |> Zoi.default(:idle),
session: Zoi.map() |> Zoi.default(%{}),
run: Zoi.map() |> Zoi.optional(),
token_usage: Zoi.map() |> Zoi.default(%{input: 0, output: 0}),
turn_count: Zoi.integer() |> Zoi.default(0)
}),
plugins: [
{MyApp.Plugins.Provider, %{model: "claude-sonnet-4-5-20250929"}},
{MyApp.Plugins.Policy, %{max_tokens: 100_000, max_turns: 50}},
]
end:idle → :running → :completed
→ :failed
→ :cancelled
:idle → :paused → :running (resume)
This maps 1:1 to ASM's Session.status transitions. The agent's cmd/2 enforces valid transitions through pattern matching on agent.state.status.
ASM tracks "runs" (individual execution units within a session). In Jido, a "run" is simply a cmd/2 invocation cycle — the signal arrives, cmd/2 processes it, directives are emitted. Track run metadata in agent.state.run and use Thread entries for the durable record:
# Start of run: action sets run state + appends thread entry
%{agent.state | run: %{id: run_id, seq: next_seq, started_at: now, phase: :running}}
# End of run: action clears run + appends completion entry
%{agent.state | run: nil, turn_count: agent.state.turn_count + 1}ASM's append-only event log maps directly to Jido.Thread with Thread.Entry structs. The kind field replaces ASM's Event.type enum.
| ASM Event Type | Thread Entry Kind | Payload Shape |
|---|---|---|
:session_created |
:session_start |
%{agent_id, model, provider} |
:session_completed |
:session_end |
%{reason, duration_ms} |
:run_started |
:run_start |
%{run_id, input_summary} |
:run_completed |
:run_end |
%{run_id, output_summary, token_usage} |
:message_sent |
:message |
%{role: :user, content: "..."} |
:message_received |
:message |
%{role: :assistant, content: "..."} |
:tool_call_started |
:tool_call |
%{tool, args, call_id} |
:tool_call_completed |
:tool_result |
%{tool, result, call_id} |
:error_occurred |
:error |
%{type, message, details} |
:token_usage_updated |
:usage |
%{input, output, total} |
:policy_violation |
:policy_violation |
%{policy, limit, actual} |
- Append-only semantics:
Thread.append/2— entries are immutable once added - Monotonic ordering:
Entry.seq— auto-assigned byThread.append/2 - Cross-references:
Entry.refs— link to signals, instructions, actions - Durable journaling:
Jido.Storage.append_thread/3withexpected_revfor optimistic concurrency - Checkpoint invariants:
Jido.Persistflushes journal before checkpoint, stores thread pointer (not full thread)
ASM's TranscriptBuilder reconstructs conversation history from events. In Jido, this is a projection function over Thread:
defmodule MyApp.Session.Transcript do
@moduledoc "Project Thread entries into LLM-ready message lists."
alias Jido.Thread
@spec to_messages(Thread.t(), keyword()) :: [map()]
def to_messages(%Thread{entries: entries}, opts \\ []) do
entries
|> Enum.filter(&(&1.kind in [:message, :tool_call, :tool_result]))
|> Enum.map(&entry_to_message/1)
|> maybe_truncate(opts)
end
@spec window(Thread.t(), non_neg_integer()) :: [map()]
def window(thread, token_budget) do
thread
|> to_messages()
|> Enum.reverse()
|> Enum.reduce_while({[], 0}, fn msg, {acc, tokens} ->
msg_tokens = estimate_tokens(msg)
if tokens + msg_tokens <= token_budget do
{:cont, {[msg | acc], tokens + msg_tokens}}
else
{:halt, {acc, tokens}}
end
end)
|> elem(0)
end
defp entry_to_message(%{kind: :message, payload: p}), do: %{role: p.role, content: p.content}
defp entry_to_message(%{kind: :tool_call, payload: p}), do: %{role: :assistant, tool_calls: [p]}
defp entry_to_message(%{kind: :tool_result, payload: p}), do: %{role: :tool, content: p.result, tool_call_id: p.call_id}
defp maybe_truncate(messages, opts) do
case Keyword.get(opts, :max_messages) do
nil -> messages
n -> Enum.take(messages, -n)
end
end
defp estimate_tokens(%{content: c}) when is_binary(c), do: div(byte_size(c), 4)
defp estimate_tokens(_), do: 10
endThis replaces ASM's TranscriptBuilder, Transcript, and NormalizedEvent modules with a single pure module — no process, no state, just data transformation.
ASM's ProviderAdapter.execute/4 is an external side effect — it calls an LLM API. In Jido, external effects are described by directives and executed by the runtime.
defmodule MyApp.Directive.LLMRequest do
@moduledoc "Request LLM completion from a provider."
@schema Zoi.struct(
__MODULE__,
%{
provider: Zoi.atom(description: "Provider module"),
model: Zoi.string(description: "Model identifier"),
messages: Zoi.list(Zoi.any(), description: "Message list"),
tools: Zoi.list(Zoi.any(), description: "Available tools") |> Zoi.default([]),
temperature: Zoi.float(description: "Sampling temperature") |> Zoi.optional(),
max_tokens: Zoi.integer(description: "Max output tokens") |> Zoi.optional(),
reply_signal: Zoi.string(description: "Signal type for the response") |> Zoi.default("session.llm.response"),
metadata: Zoi.map(description: "Provider-specific options") |> Zoi.default(%{})
},
coerce: true
)
@type t :: unquote(Zoi.type_spec(@schema))
@enforce_keys Zoi.Struct.enforce_keys(@schema)
defstruct Zoi.Struct.struct_fields(@schema)
endThe runtime executes this directive by calling the provider, then sending a signal back:
defimpl Jido.AgentServer.DirectiveExec, for: MyApp.Directive.LLMRequest do
def execute(directive, server_state) do
# Spawn async task to avoid blocking the agent server
task = Task.async(fn ->
directive.provider.chat(directive.model, directive.messages,
tools: directive.tools,
temperature: directive.temperature,
max_tokens: directive.max_tokens
)
end)
# The result signal routes back through AgentServer
# when the task completes via handle_info
{:ok, server_state}
end
enddefmodule MyApp.Actions.RequestLLM do
use Jido.Action,
name: "session.request_llm",
description: "Build and emit an LLM request",
schema: Zoi.object(%{
content: Zoi.string(description: "User message content")
})
def run(%{content: content}, %{agent: agent}) do
thread = agent.state.__thread__
messages = MyApp.Session.Transcript.to_messages(thread)
model = agent.state.session.model
directive = %MyApp.Directive.LLMRequest{
provider: agent.state.session.provider,
model: model,
messages: messages ++ [%{role: :user, content: content}],
tools: get_available_tools(agent)
}
{:ok, %{status: :running, run: %{id: Jido.Util.generate_id(), phase: :llm_pending}}, [directive]}
end
enddefmodule MyApp.Plugins.Provider do
use Jido.Plugin,
name: "llm_provider",
state_key: :__provider__,
actions: [MyApp.Actions.RequestLLM, MyApp.Actions.HandleLLMResponse],
capabilities: [:llm_provider],
schema: Zoi.object(%{
model: Zoi.string() |> Zoi.default("claude-sonnet-4-5-20250929"),
provider: Zoi.atom() |> Zoi.default(MyApp.Providers.Claude),
api_key_env: Zoi.string() |> Zoi.optional()
}),
signal_routes: [
{"session.user_message", MyApp.Actions.RequestLLM},
{"session.llm.response", MyApp.Actions.HandleLLMResponse}
]
enddefmodule MyApp.Plugins.Policy do
use Jido.Plugin,
name: "session_policy",
state_key: :__policy__,
actions: [],
capabilities: [:policy_enforcement],
schema: Zoi.object(%{
max_tokens: Zoi.integer() |> Zoi.default(100_000),
max_turns: Zoi.integer() |> Zoi.default(100),
max_duration_ms: Zoi.integer() |> Zoi.optional(),
tool_allow: Zoi.list(Zoi.string()) |> Zoi.optional(),
tool_deny: Zoi.list(Zoi.string()) |> Zoi.optional(),
on_violation: Zoi.atom() |> Zoi.default(:cancel)
})
@impl Jido.Plugin
def handle_signal(signal, %{agent: agent} = ctx) do
policy = agent.state.__policy__
cond do
exceeds_token_limit?(agent, policy) ->
{:error, Jido.Error.execution_error("Token budget exceeded",
details: %{limit: policy.max_tokens, used: agent.state.token_usage.total})}
exceeds_turn_limit?(agent, policy) ->
{:error, Jido.Error.execution_error("Turn limit exceeded",
details: %{limit: policy.max_turns, used: agent.state.turn_count})}
tool_blocked?(signal, policy) ->
{:error, Jido.Error.execution_error("Tool not allowed by policy")}
true ->
{:cont, signal, ctx}
end
end
enddefmodule MyApp.Plugins.Router do
use Jido.Plugin,
name: "provider_router",
state_key: :__router__,
actions: [],
capabilities: [:provider_routing],
schema: Zoi.object(%{
providers: Zoi.list(Zoi.any(), description: "Ordered provider list"),
failure_threshold: Zoi.integer() |> Zoi.default(3),
cooldown_ms: Zoi.integer() |> Zoi.default(30_000),
health: Zoi.map() |> Zoi.default(%{})
})
@impl Jido.Plugin
def handle_signal(%{type: "session.user_message"} = signal, %{agent: agent} = ctx) do
router_state = agent.state.__router__
case select_provider(router_state) do
{:ok, provider} ->
# Override the provider in session state for this request
{:cont, signal, ctx}
{:error, :all_unhealthy} ->
{:error, Jido.Error.execution_error("All providers unhealthy")}
end
end
def handle_signal(signal, ctx), do: {:cont, signal, ctx}
endFor per-agent concurrency, AgentServer.max_queue_size already prevents unbounded buildup. For global/cross-agent limiting:
defmodule MyApp.Plugins.ConcurrencyLimiter do
use Jido.Plugin,
name: "concurrency_limiter",
state_key: :__limiter__,
actions: [],
capabilities: [:concurrency_control],
schema: Zoi.object(%{
limiter_name: Zoi.atom() |> Zoi.default(:global_limiter)
})
# Start a shared limiter process as a plugin child
@impl Jido.Plugin
def child_spec(config) do
[{MyApp.Limiter, name: config.limiter_name, max_concurrent: 50}]
end
@impl Jido.Plugin
def handle_signal(%{type: "session.user_message"} = signal, ctx) do
case MyApp.Limiter.acquire(ctx.agent.state.__limiter__.limiter_name) do
:ok -> {:cont, signal, ctx}
{:error, :at_capacity} ->
# Backpressure: schedule retry
{:override, Jido.Agent.Directive.schedule(1000, signal)}
end
end
endASM's SessionStore, DurableStore, and event persistence map directly to existing Jido infrastructure:
| ASM Operation | Jido Equivalent |
|---|---|
SessionStore.save_session/2 |
Jido.Persist.hibernate/2 (checkpoint) |
SessionStore.get_session/2 |
Jido.Persist.thaw/3 (restore) |
SessionStore.append_event/2 |
Jido.Storage.append_thread/3 (journal) |
SessionStore.get_events/3 |
Jido.Storage.load_thread/2 + filter |
DurableStore.flush/2 |
Jido.Persist.hibernate/2 (flush journal → checkpoint) |
InMemorySessionStore |
Jido.Storage.ETS |
| Event deduplication | Thread.Entry.id + expected_rev on append |
| Event sequence numbers | Thread.Entry.seq (monotonic within thread) |
hibernate/2:
1. Extract thread from agent.state.__thread__
2. Flush journal: Storage.append_thread(thread_id, entries)
3. Create checkpoint: remove __thread__, store pointer {id, rev}
4. Save: Storage.put_checkpoint(key, checkpoint_data)
thaw/3:
1. Load checkpoint: Storage.get_checkpoint(key)
2. Restore agent from checkpoint data
3. Load thread: Storage.load_thread(thread_id)
4. Verify thread.rev matches checkpoint pointer
5. Reattach thread to agent.state.__thread__
This is exactly what ASM does across SessionStore, DurableStore, and SessionStoreBridge — but Jido has it in one coordinated module.
| ASM Concept | Jido Equivalent |
|---|---|
| Event-based observability | Jido.Observe (:telemetry wrapper) |
| Telemetry spans | Jido.Observe.with_span/3 |
| Correlation IDs | Jido.Tracing.Context (trace_id, span_id, causation_id) |
| Structured logging | Jido.Telemetry (interestingness filtering) |
| Token usage tracking | Thread entries with :usage kind |
ASM's workspace instrumentation (snapshots, diffs, rollback) has no Jido equivalent. Implement as a plugin with an external storage adapter:
defmodule MyApp.Plugins.Workspace do
use Jido.Plugin,
name: "workspace",
state_key: :__workspace__,
actions: [MyApp.Actions.TakeSnapshot, MyApp.Actions.ComputeDiff],
capabilities: [:workspace],
schema: Zoi.object(%{
path: Zoi.string(),
backend: Zoi.atom() |> Zoi.default(:git),
snapshots: Zoi.list(Zoi.any()) |> Zoi.default([])
})
endA thin module for common operations (not a new runtime):
defmodule MyApp.Session do
@moduledoc "High-level session operations over Jido primitives."
alias Jido.{AgentServer, Persist}
def start_or_resume(session_id, opts \\ []) do
storage = Keyword.get(opts, :storage, {Jido.Storage.ETS, []})
case Persist.thaw(storage, MyApp.SessionAgent, session_id) do
{:ok, agent} -> AgentServer.start(agent: agent, agent_module: MyApp.SessionAgent)
:not_found -> AgentServer.start(agent: MyApp.SessionAgent, id: session_id)
end
end
def send_message(server, content) do
signal = Jido.Signal.new!("session.user_message", %{content: content})
AgentServer.call(server, signal)
end
def get_transcript(server) do
{:ok, state} = AgentServer.state(server)
MyApp.Session.Transcript.to_messages(state.agent.state.__thread__)
end
def hibernate(server, storage) do
{:ok, state} = AgentServer.state(server)
Persist.hibernate(storage, state.agent)
end
endFor LLM streaming responses, use incremental signals:
LLM Provider → Task → AgentServer
signal: "session.llm.delta" {content: "partial..."}
signal: "session.llm.delta" {content: "more..."}
signal: "session.llm.done" {finish_reason: :stop, usage: {...}}
Each delta appends a :llm_delta thread entry. The final :llm_done signal triggers consolidation into a single :message entry with role: :assistant.
| ASM Error Code | Jido Error |
|---|---|
:validation_error / :invalid_input |
Jido.Error.validation_error/2 |
:invalid_transition / :invalid_status |
Jido.Error.validation_error/2 (kind: :state) |
:not_found / :session_not_found |
Jido.Error.execution_error/2 |
:provider_error / :provider_timeout |
Jido.Error.execution_error/2 (phase: :provider) |
:provider_rate_limited |
Jido.Error.execution_error/2 (retryable via policy) |
:storage_error |
Jido.Error.internal_error/2 |
:timeout |
Jido.Error.timeout_error/2 |
:policy_violation |
Jido.Error.execution_error/2 (phase: :policy) |
:tool_error / :tool_not_found |
Jido.Error.execution_error/2 (phase: :tool) |
:max_sessions_exceeded |
Jido.Error.execution_error/2 |
| ASM Component | Why Skip It |
|---|---|
SessionManager (coordinator process) |
AgentServer already is the per-session runtime |
NormalizedEvent (canonical event struct) |
Thread.Entry with kind conventions suffices |
EventStream (lazy stream wrapper) |
Elixir Stream over Thread.entries is adequate |
ControlOperations (interrupt/pause GenServer) |
Handle via signals: "session.cancel", "session.pause" |
RunQueue (FIFO queue) |
AgentServer.State.queue already exists |
PermissionMode (provider approval semantics) |
Pass through provider config, not a separate concept |
Separate ArtifactStore behaviour |
Extend Jido.Storage if needed, or use plugin state |
- Define
SessionAgentwith standard state schema - Define thread entry kind conventions (table in §2)
- Implement
Transcript.to_messages/2andTranscript.window/2 - Write session lifecycle actions (Start, Cancel, Complete)
- Write the thin
Sessionconvenience API
- Define
%LLMRequest{}directive - Implement
DirectiveExecfor LLM requests - Build
ProviderPluginwith signal routes - Add streaming support (delta signals → consolidated entries)
- Build
PolicyPluginwithhandle_signal/2enforcement - Build
RouterPluginwith circuit breaker state - Add telemetry spans for provider calls
- Build
WorkspacePluginif workspace instrumentation is needed - Add global
ConcurrencyLimiterplugin if scale requires it - Add
EctoSessionStore-equivalentJido.Storageadapter if needed
| Feature | Covered By | Notes |
|---|---|---|
| Session CRUD | Agent state + Persist | |
| Session status transitions | Agent cmd/2 pattern matching |
|
| Run lifecycle | Agent state + Thread entries | |
| Append-only event log | Jido.Thread |
|
| Event deduplication | Thread.Entry.id |
|
| Monotonic sequence numbers | Thread.Entry.seq |
|
| Transcript reconstruction | Pure projection function | New module |
| Context window management | Projection with token budget | New function |
| Provider abstraction | Plugin + custom directive | New plugin |
| Provider capability negotiation | Plugin :capabilities + :requires |
|
| Policy enforcement | Plugin handle_signal/2 |
New plugin |
| Circuit breaker | Plugin state + signal override | New plugin |
| Provider routing / failover | Plugin handle_signal/2 |
New plugin |
| Concurrency limiting (per-agent) | AgentServer.max_queue_size |
|
| Concurrency limiting (global) | Plugin with shared child process | New plugin |
| Checkpoint persistence | Jido.Persist |
|
| Journal persistence | Jido.Storage.append_thread/3 |
|
| ETS storage | Jido.Storage.ETS |
|
| Telemetry / observability | Jido.Observe + Jido.Telemetry |
|
| Correlation tracing | Jido.Tracing.Context |
|
| Component discovery | Jido.Discovery |
|
| Cron scheduling | Jido.Scheduler + %Directive.Cron{} |
|
| Async completion waiting | Jido.Await |
|
| Parent-child agent hierarchy | %Directive.SpawnAgent{} |
|
| Workspace snapshots/diffs | New WorkspacePlugin |
If needed |