Engines¶
LLMEngine is the LLM-driven tool-calling loop; Plan is the
deterministic-DAG engine and Step is its unit; ReplanEngine is the
adaptive counterpart to Plan for pipelines whose shape is decided at
runtime by a planner agent (PlanRound / Task are its output schema).
PlanCompileError fires at construction for invalid DAGs;
ToolTimeoutError and StreamStallError surface from the LLM engine's
safety nets.
For narrative usage see Guides → Full → Plan, Step, ReplanEngine, and the Engine protocol (extension surface).
LLM engine¶
LLMEngine ships several production-grade knobs that are easy to
miss in the auto-generated signature below. Quick reference:
| Knob | Default | Purpose |
|---|---|---|
max_turns |
20 |
Cap on tool-calling rounds; prevents runaway loops |
tool_choice |
"auto" |
"auto" / "any"; first-turn force-to-call mapped per provider |
max_retries |
3 |
Provider transient-error retries with exponential backoff + jitter |
request_timeout |
120.0 |
Per-completion deadline. Distinct from Agent(timeout=N) (total run). None disables |
max_parallel_tools |
8 |
Cap on concurrent tool calls within one turn. None = unbounded |
tool_timeout |
None |
Per-tool asyncio.wait_for deadline; on timeout reports is_error=True to the model loop |
stream_idle_timeout |
90.0 |
Idle gap between streaming chunks before StreamStallError; pass None to disable (a one-shot UserWarning is emitted at LLMEngine.__init__ time, not at stream time). |
stream_buffer |
64 |
Bounded queue for streaming producers. Must be ≥1 |
allow_dangerous_native_tools |
False |
Security gate for CODE_EXECUTION / COMPUTER_USE; opt-in required |
thinking |
False / ThinkingConfig |
Extended-thinking opt-in. Anthropic Opus 4.6+ / Claude 4.7 use adaptive thinking (server-managed budget; pass display="omitted" to hide thoughts). OpenAI o1/o3/o4/gpt-5 and Gemini 2.5+ surface reasoning_tokens automatically; passing a ThinkingConfig(effort=...) is forwarded where the provider supports it. See provider-capability matrix in lazybridge.matrix. |
strict_multimodal |
False |
Raise UnsupportedFeatureError when the model doesn't support an attachment modality |
strict_native_tools is not an LLMEngine knob — it lives on
BaseProvider (constructor arg + class attribute). Configure it
where you build the provider, e.g. AnthropicProvider(...,
strict_native_tools=True); LLMEngine reads it off the resolved
provider at request time. See
BaseProvider.
lazybridge.LLMEngine ¶
LLMEngine(model: str, *, provider: str | None = None, thinking: bool = False, max_turns: int = 20, tool_choice: Literal['auto', 'any'] = 'auto', temperature: float | None = None, system: str | None = None, native_tools: list[NativeTool | str] | None = None, allow_dangerous_native_tools: bool = False, max_retries: int = 3, retry_delay: float = 1.0, request_timeout: float | None = 120.0, max_parallel_tools: int | None = 8, max_tool_calls_per_turn: int | None = None, tool_timeout: float | None = None, stream_idle_timeout: float | None = _USE_DEFAULT_STREAM_IDLE, stream_buffer: int = 64, cache: bool | Any = False, strict_multimodal: bool = False)
Drives the LLM ↔ tool-call loop for a single agent invocation.
Parameters¶
model: Model string, e.g. "claude-opus-4-7". Provider is inferred automatically. thinking: Enable extended thinking (Anthropic) or reasoning (OpenAI o-series). max_turns: Maximum tool-call rounds before giving up with MaxTurnsExceeded error. Default 20 — a plain tool-using task typically completes in 2-5 rounds; the 20-round budget leaves headroom for deeper reasoning loops without capping legitimate pipelines. Bump higher for deliberately long agentic tasks; lower it during dev to fail fast. tool_choice: "auto" — provider decides when to call tools (default). "any" — provider must call at least one tool on the first turn.
When ``"any"`` is set, the engine maps it to ``"required"`` on
the wire (Anthropic and OpenAI both reject the literal string
``"any"`` as an unknown tool name). After the model satisfies
the "must call at least one tool" contract on the first turn,
the engine resets to ``"auto"`` for all subsequent turns so the
model can produce a final text answer. Without this reset the
model would be forced to call tools on every turn, causing an
infinite loop until ``max_turns`` is exhausted.
When the model emits multiple tool calls in a single turn,
LazyBridge always executes them concurrently via ``asyncio.gather``.
That is a capability of the engine, not a configuration knob;
there is no "serial" execution path for LLM-emitted tool calls.
temperature:
Sampling temperature. None = provider default.
system:
Static system prompt. Agent.sources= / Envelope.context are added on top.
native_tools:
Provider-native server-side tools, e.g. NativeTool.WEB_SEARCH.
max_retries:
Retries on transient provider errors (429, 5xx, network/timeout).
Default 3 — production-safe. Pass 0 to disable.
retry_delay:
Base delay (seconds) for exponential backoff with ±10% jitter.
request_timeout:
Per-completion deadline in seconds. Caps the time a hung
provider can block an agent run. None disables the
framework-level timeout and defers to the provider SDK.
max_parallel_tools:
Maximum number of tool calls executed concurrently within a
single model turn. Default is 8. None means unbounded
— every tool call returned by the model runs in parallel. Set
to a small integer (e.g. 4–8) to apply backpressure on wide
tool fan-outs and prevent thread/socket/DB exhaustion on a
single turn.
max_tool_calls_per_turn:
Maximum number of tool calls executed per model turn — distinct
from max_parallel_tools (which only bounds how many run
concurrently). None (default) executes every call the model
emits. Set to 1 to keep a multi-agent graph on a single,
non-branching path: the first call runs and any extras get an
is_error result block telling the model only one call per turn
is allowed, so it learns to emit fewer. See
:class:~lazybridge.AgentPool and :func:~lazybridge.conclude.
tool_timeout:
Per-tool deadline in seconds. When set, each tool execution
is wrapped in asyncio.wait_for. On timeout the tool's
result is reported as is_error=True to the model loop so
the model can recover; the run does not abort. None
(default) leaves tools unbounded.
stream_idle_timeout:
Maximum time (seconds) the engine will wait between
successive streaming chunks before raising
StreamStallError. Catches half-open streams without
killing legitimately long fast streams. Defaults to
:data:DEFAULT_STREAM_IDLE_TIMEOUT (90.0 s). Pass an
explicit None to disable stall detection — a one-shot
UserWarning is emitted because a half-open provider
stream then pins the worker indefinitely.
Source code in lazybridge/engines/llm.py
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 | |
set_default_provider
classmethod
¶
Set (or disable) the fallback provider used when no rule matches.
Two common uses::
# Production hardening: raise on unknown models rather than
# silently falling back to the default. Recommended when you
# only ever want the providers you've explicitly registered.
LLMEngine.set_default_provider(None)
# Redirect the safety-net to a different built-in:
LLMEngine.set_default_provider("openai")
Why this helper exists: Agent("grok-2") would default-route
to Anthropic and fail several RTTs later with a cryptic
provider-side "unknown model" error. Disabling
the fallback turns that into a loud ValueError at
construction time.
Source code in lazybridge/engines/llm.py
provider_aliases
classmethod
¶
Return a snapshot of the current model-string → provider alias map.
Callers that need to validate a user-supplied model string (or
document the accepted aliases) should read from this dict
rather than reach into _PROVIDER_ALIASES directly — the
return value is a fresh copy, so accidental mutation can't
affect the framework's routing. The set is also surfaced in
:data:lazybridge.PROVIDER_ALIASES for top-level convenience.
Source code in lazybridge/engines/llm.py
register_provider_alias
classmethod
¶
Register an exact-match model-string → provider alias.
Example::
LLMEngine.register_provider_alias("mistral", "mistral")
Agent("mistral") # resolves to the mistral provider
Thread-safe: serialised via _PROVIDER_REGISTRY_LOCK so two
threads racing here can't lose a registration to a read-then-write
clobber on the class attribute.
Source code in lazybridge/engines/llm.py
register_provider_rule
classmethod
¶
register_provider_rule(pattern: str, provider: str, *, kind: Literal['contains', 'startswith'] = 'contains') -> None
Register a substring / prefix routing rule.
New rules take priority over built-ins so you can override default routing without editing the framework source::
LLMEngine.register_provider_rule("claude-opus-5", "anthropic")
Agent("claude-opus-5-20260701") # routed to anthropic
Thread-safe: serialised via _PROVIDER_REGISTRY_LOCK so two
threads racing here can't lose a rule to a read-then-write
clobber on the class attribute.
Source code in lazybridge/engines/llm.py
stream
async
¶
stream(env: Envelope[Any], *, tools: list[Tool], output_type: type, memory: Memory | None, session: Session | None) -> AsyncGenerator[str, None]
Stream tokens from the full tool-calling loop.
Yields str tokens as the LLM generates them. Tool calls between turns are executed silently; the next-turn response is then streamed. This means token output is continuous across tool-call boundaries.
Source code in lazybridge/engines/llm.py
1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 | |
Plan + Step¶
lazybridge.Plan ¶
Plan(*steps: Step, max_iterations: int = 100, store: Store | None = None, checkpoint_key: str | None = None, resume: bool = False, on_concurrent: Literal['fail', 'fork'] = 'fail')
Structured multi-step execution engine.
Steps run sequentially by default. Routing is explicit at the
Step level via Step(routes={...}) (predicate map) or
Step(routes_by="field") (LLM-decided via a Literal field on
the structured output). Parallel branches via step.parallel=True.
PlanCompiler runs at Agent construction time; errors surface before any LLM call.
Construct a Plan.
Checkpoint / resume¶
Pass store= and checkpoint_key= to persist minimal plan state
(next step to run, writes-bucket values, completed step names)
after every step. Pass resume=True together with a populated
store[checkpoint_key] to pick up where the previous run stopped
(useful after a crash, interrupt, or external pause).
Example::
store = Store(db="run.sqlite")
plan = Plan(
Step(researcher, writes="research"),
Step(writer, writes="draft"),
store=store,
checkpoint_key="my_pipeline",
resume=True,
)
Agent(engine=plan)("topic") # continues if a checkpoint exists
The persisted shape (v2) includes minimal plan state plus serialized
StepResult history: {"next_step": str, "kv": {...},
"completed_steps": [...], "status": str, "run_uid": str,
"history": [...]}.
History is serialized so a resumed run can re-aggregate
from_parallel_all bands and nested-cost rollup against completed
upstream steps. Only writes-bucket values and step history
survive across process boundaries; live in-memory state does not.
Crash-window durability¶
Each step writes its checkpoint before the durable
store.write(step.writes, value) call. This eliminates
double-writes on resume — the checkpoint already records
next_step as the following step, so a resumed run does not
re-execute the completed step. The trade-off is that a crash in
the gap between the checkpoint and the Store write makes the
durable Store write lost; the value still lives in the
checkpoint's serialised kv and is read back into in-memory
state on resume, so the Plan continues correctly, but sidecar
consumers reading the Store directly should reconcile against
the checkpoint snapshot rather than assume Store completeness.
Concurrency¶
Every checkpoint write goes through
:meth:lazybridge.store.Store.compare_and_swap, so two Plan
runs can never silently overwrite each other's state. Two
policies are available via on_concurrent=:
-
"fail"(default) —checkpoint_keyidentifies a single in-flight run. A second Plan on the same key, while the first is still running, raises :class:ConcurrentPlanRunError. This is the correctness floor; pick it when runs legitimately share state (e.g. graceful crash-resume viaresume=True). -
"fork"—checkpoint_keynames the pipeline; each.run()claims its own isolated effective keyf"{checkpoint_key}:{run_uid}". Many runs of the same pipeline can execute concurrently with no collision. This is the mode you want for fan-out workflows (N backtests / seeds / tickers sharing a pipeline definition).resumeis not supported inforkmode because there is no single shared checkpoint to resume — if you need resume, useon_concurrent="fail"with distinct per-run keys.
Example::
store = Store(db="run.sqlite")
plan = Plan(
Step(researcher, writes="research"),
Step(writer, writes="draft"),
store=store,
checkpoint_key="my_pipeline",
resume=True,
)
Source code in lazybridge/engines/plan/_plan.py
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 | |
run_many ¶
run_many(tasks: list[str | Envelope[Any]], *, concurrency: int | None = None, tools: list[Any] | None = None, memory: Any = None, session: Any = None, output_type: type = str) -> list[Envelope[Any]]
Run this Plan concurrently against N inputs; sync return.
Each task is dispatched as its own Plan.run invocation
on a fresh asyncio task; results are returned as a list in
input order. Pair with Plan(on_concurrent="fork", ...) for
true fan-out workflows where each input claims its own
per-run keyspace.
Errors are returned as error envelopes in the corresponding
slot — the call never raises (matches Agent.parallel
semantics).
concurrency caps the number of in-flight runs via an
asyncio semaphore. None (default) lets every task fire
immediately.
Pass tools when the Plan's steps use string-name targets that
must be resolved against a live tool map. Omitting tools
(or passing []) works only when every step target is an
Agent object rather than a string alias.
See :meth:arun_many for the async variant when the caller is
already inside an event loop.
Source code in lazybridge/engines/plan/_plan.py
arun_many
async
¶
arun_many(tasks: list[str | Envelope[Any]], *, concurrency: int | None = None, tools: list[Any] | None = None, memory: Any = None, session: Any = None, output_type: type = str) -> list[Envelope[Any]]
Async counterpart to :meth:run_many.
Use this directly when you're already inside an event loop and
want to await the fan-out without the sync-bridge overhead.
Pass tools when the Plan's steps use string-name targets that
must be resolved against a live tool map. Omitting tools
(or passing []) works only when every step target is an
Agent object rather than a string alias.
Source code in lazybridge/engines/plan/_plan.py
to_dict ¶
Serialise the Plan's topology to a JSON-compatible dict.
Callables and Agents are serialised by name only — rebind
them at load time via :meth:from_dict's registry kwarg.
Sentinels, writes, parallel flags, iteration limit, and step
order are preserved faithfully.
Source code in lazybridge/engines/plan/_plan.py
from_dict
classmethod
¶
Reconstruct a Plan from a to_dict payload.
registry maps serialised target names back to live callables /
Agents. Missing entries for non-tool targets raise
:class:KeyError with the offending name — keeping the failure
loud rather than producing a silently-broken Plan.
Example::
saved = plan.to_dict() # store somewhere
...
plan = Plan.from_dict(saved, registry={
"researcher": researcher_agent,
"fetch": fetch_function,
})
Source code in lazybridge/engines/plan/_plan.py
lazybridge.Step
dataclass
¶
Step(target: Any, task: Sentinel | str = (lambda: from_prev)(), context: Sentinel | str | list[Sentinel | str] | None = None, sources: list[Any] = list(), writes: str | None = None, input: type = Any, output: type = str, parallel: bool = False, name: str | None = None, routes: dict[str, Callable[[Any], bool]] | None = None, routes_by: str | None = None, after_branches: str | None = None, _name_is_opaque: bool = False)
A single node in a Plan.
target and name are two distinct concepts:
-
target— which tool to call. When a string, it must match the key registered in the parent Agent's tool map, i.e. the name passed toagent.as_tool("name"). This is the link between the Plan and the tools list::researcher.as_tool("research") → tool map key: "research" Step("research") → target="research", calls that tool Step("research", name="phase1") → calls "research" tool, step is "phase1"
When target is a string and name is omitted, name
defaults to target — so in the common case they are the same.
They only diverge when you want a display name or routing key that
differs from the tool name.
name— the step's identity in the plan. Used for routing (routes={"name": predicate}), sentinel lookups (from_step("name")), checkpointing, and display. If routing breaks silently, check that the target name inroutes=matches thenameof the intended step, not itstarget.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
target
|
Any
|
Tool name (str), callable, or Agent. Required. |
required |
task
|
Sentinel | str
|
Sentinel or str for the step's task. Default: from_prev. |
(lambda: from_prev)()
|
context
|
Sentinel | str | list[Sentinel | str] | None
|
Sentinel, str, or list of either for extra context.
A list joins its resolved parts with blank-line separators
(same shape as |
None
|
sources
|
list[Any]
|
Live-view objects with a .text() method injected into context. |
list()
|
writes
|
str | None
|
Key under which Envelope.payload is saved in the Store. |
None
|
input
|
type
|
Expected input payload type (PlanCompiler validates). |
Any
|
output
|
type
|
Expected output payload type (triggers structured output). |
str
|
parallel
|
bool
|
True if this step runs concurrently with siblings. |
False
|
name
|
str | None
|
The step's identity for routing, sentinels, and display.
Defaults to |
None
|
routes
|
dict[str, Callable[[Any], bool]] | None
|
Predicate-based routing. Mapping |
None
|
routes_by
|
str | None
|
LLM-decided routing via a named field on the
step's structured output. Pass the attribute name
(e.g. |
None
|
after_branches
|
str | None
|
Exclusive-branch rejoin point. Only valid
alongside |
None
|
Without after_branches, routing is a detour: after the
routed-to step runs, linear progression resumes from its declared
position. To make a step terminal, place it at the end of the
declared step list. Loops are simply routes back to an earlier
step; Plan(max_iterations=...) is the safety net.
ReplanEngine¶
The adaptive replan-loop engine. A planner tool (built with
output=PlanRound) is called every round; ReplanEngine dispatches the
tasks it emits and checkpoints after each round. See
Guides → Full → ReplanEngine for
narrative usage.
lazybridge.ReplanEngine ¶
ReplanEngine(*, planner_name: str = 'planner', store: Store | None = None, checkpoint_key: str | None = None, resume: bool = False, max_rounds: int = 20)
Engine that guards the dynamic replan loop with checkpoint/resume.
The planner and all worker tools are resolved from the tool_map at run time — nothing is injected at construction except configuration::
guardian = Agent(
engine=ReplanEngine(
store=Store(db="project.sqlite"),
checkpoint_key="my-project",
resume=True,
),
tools=[planner, analyst, coder, pool.as_tool("route")],
)
The planner tool must have output=PlanRound. ReplanEngine builds the
planner's input dynamically (tool schemas + history) so the planner does
not need a static system prompt that lists worker names.
See :class:lazybridge.engines.replan.PlanRound and
:class:lazybridge.engines.replan.Task for the planner output schema.
Source code in lazybridge/engines/replan/_engine.py
lazybridge.PlanRound ¶
Bases: BaseModel
Structured output emitted by the planner agent each round.
ReplanEngine calls the planner tool, deserialises this schema, and
dispatches the tasks. Tasks within the same round with parallel=True
run concurrently via asyncio.gather; parallel=False tasks run
sequentially after the parallel group.
Dependent tasks belong in the next round — after the planner has seen the outputs from this one.
lazybridge.Task ¶
Bases: BaseModel
A single tool call planned for this round.
tool must match a key in the parent Agent's tool_map — an agent, a
plain function, a pool route, or any other callable wrapped as a Tool.
kwargs are forwarded verbatim to tool.run(**kwargs) so they must
match the tool's JSON schema exactly.
Examples::
Task(tool="analyst", kwargs={"task": "analyse the auth module"})
Task(tool="route", kwargs={"agent_name": "alice", "task": "write tests"})
Task(tool="add", kwargs={"a": 3, "b": 7})
Engine errors¶
lazybridge.PlanCompileError ¶
Bases: Exception
lazybridge.PlanRuntimeError ¶
Bases: RuntimeError
Raised when a Plan step misbehaves at runtime in a way that indicates a programming bug — not a recoverable runtime condition.
The canonical case is a Step(routes={...}) predicate that
raises an exception during evaluation. The engine wraps the
underlying exception in :class:PlanRuntimeError with the
offending step and target named, then propagates.
Distinct from :class:PlanCompileError (which fires at
construction time for static DAG validation) and from
:class:ConcurrentPlanRunError (which fires at runtime CAS
collision). Distinct from regular Envelope.error_envelope
return paths (which signal recoverable runtime failures the
caller can handle without a try/except).
lazybridge.PlanPaused ¶
Bases: BaseException
Raised by a step target to halt the Plan and persist a resumable checkpoint.
Subclasses :class:BaseException (not :class:Exception) so user
code's except Exception clauses do not accidentally swallow the
pause signal — same pattern as :class:KeyboardInterrupt and
:class:SystemExit.
The engine catches PlanPaused after the offending step's
invocation:
- Writes a checkpoint with
status="paused"andnext_steppointing at the same step (so a futureresume=Truerun re-invokes the step rather than skipping past it). - Returns an
EnvelopewhoseerrorisErrorInfo(type="PlanPaused", retryable=True, ...)so the caller can detect the pause and arrange for the resume.
Use this when a step needs to halt the pipeline cooperatively — for example, the step has detected that an external precondition isn't met (webhook hasn't arrived, human approval pending) and the rest of the pipeline cannot proceed yet.
Example::
from lazybridge.engines.plan import PlanPaused
def webhook_step(task: str) -> str:
if not webhook_payload_available():
raise PlanPaused("waiting for webhook delivery")
return process(task)
Resume::
# Same Plan, resume=True picks up at the paused step.
Agent(
engine=Plan(*steps,
store=store,
checkpoint_key="run-42",
resume=True),
tools=[...],
)("…")
Source code in lazybridge/engines/plan/_types.py
lazybridge.ConcurrentPlanRunError ¶
Bases: RuntimeError
Raised when two Plan runs race for the same checkpoint_key.
Checkpoints are serialised through :meth:lazybridge.store.Store.compare_and_swap
so the first writer wins and any second writer fails fast instead of
silently overwriting the first run's state. Derive a unique
checkpoint_key per run (e.g. f"pipeline-{uuid.uuid4().hex}")
when you need concurrent execution on the same :class:Store.
lazybridge.ToolTimeoutError ¶
Bases: Exception
Raised when a tool exceeds LLMEngine.tool_timeout.
The engine catches this internally and reports the failure to the
model loop as ToolResultContent(is_error=True) so the model
can recover; it does not abort the agent run.
lazybridge.StreamStallError ¶
Bases: Exception
Raised when a streaming response goes idle past stream_idle_timeout.
Distinct from request_timeout (total deadline) — this fires
when the time between successive chunks exceeds the threshold,
catching half-open streams and partial provider outages without
killing fast streams that legitimately take a long time end-to-end.