Skip to content

Engines

LLMEngine is the LLM-driven tool-calling loop; Plan is the deterministic-DAG engine and Step is its unit; ReplanEngine is the adaptive counterpart to Plan for pipelines whose shape is decided at runtime by a planner agent (PlanRound / Task are its output schema). PlanCompileError fires at construction for invalid DAGs; ToolTimeoutError and StreamStallError surface from the LLM engine's safety nets.

For narrative usage see Guides → Full → Plan, Step, ReplanEngine, and the Engine protocol (extension surface).

LLM engine

LLMEngine ships several production-grade knobs that are easy to miss in the auto-generated signature below. Quick reference:

Knob Default Purpose
max_turns 20 Cap on tool-calling rounds; prevents runaway loops
tool_choice "auto" "auto" / "any"; first-turn force-to-call mapped per provider
max_retries 3 Provider transient-error retries with exponential backoff + jitter
request_timeout 120.0 Per-completion deadline. Distinct from Agent(timeout=N) (total run). None disables
max_parallel_tools 8 Cap on concurrent tool calls within one turn. None = unbounded
tool_timeout None Per-tool asyncio.wait_for deadline; on timeout reports is_error=True to the model loop
stream_idle_timeout 90.0 Idle gap between streaming chunks before StreamStallError; pass None to disable (a one-shot UserWarning is emitted at LLMEngine.__init__ time, not at stream time).
stream_buffer 64 Bounded queue for streaming producers. Must be ≥1
allow_dangerous_native_tools False Security gate for CODE_EXECUTION / COMPUTER_USE; opt-in required
thinking False / ThinkingConfig Extended-thinking opt-in. Anthropic Opus 4.6+ / Claude 4.7 use adaptive thinking (server-managed budget; pass display="omitted" to hide thoughts). OpenAI o1/o3/o4/gpt-5 and Gemini 2.5+ surface reasoning_tokens automatically; passing a ThinkingConfig(effort=...) is forwarded where the provider supports it. See provider-capability matrix in lazybridge.matrix.
strict_multimodal False Raise UnsupportedFeatureError when the model doesn't support an attachment modality

strict_native_tools is not an LLMEngine knob — it lives on BaseProvider (constructor arg + class attribute). Configure it where you build the provider, e.g. AnthropicProvider(..., strict_native_tools=True); LLMEngine reads it off the resolved provider at request time. See BaseProvider.

lazybridge.LLMEngine

LLMEngine(model: str, *, provider: str | None = None, thinking: bool = False, max_turns: int = 20, tool_choice: Literal['auto', 'any'] = 'auto', temperature: float | None = None, system: str | None = None, native_tools: list[NativeTool | str] | None = None, allow_dangerous_native_tools: bool = False, max_retries: int = 3, retry_delay: float = 1.0, request_timeout: float | None = 120.0, max_parallel_tools: int | None = 8, max_tool_calls_per_turn: int | None = None, tool_timeout: float | None = None, stream_idle_timeout: float | None = _USE_DEFAULT_STREAM_IDLE, stream_buffer: int = 64, cache: bool | Any = False, strict_multimodal: bool = False)

Drives the LLM ↔ tool-call loop for a single agent invocation.

Parameters

model: Model string, e.g. "claude-opus-4-7". Provider is inferred automatically. thinking: Enable extended thinking (Anthropic) or reasoning (OpenAI o-series). max_turns: Maximum tool-call rounds before giving up with MaxTurnsExceeded error. Default 20 — a plain tool-using task typically completes in 2-5 rounds; the 20-round budget leaves headroom for deeper reasoning loops without capping legitimate pipelines. Bump higher for deliberately long agentic tasks; lower it during dev to fail fast. tool_choice: "auto" — provider decides when to call tools (default). "any" — provider must call at least one tool on the first turn.

When ``"any"`` is set, the engine maps it to ``"required"`` on
the wire (Anthropic and OpenAI both reject the literal string
``"any"`` as an unknown tool name).  After the model satisfies
the "must call at least one tool" contract on the first turn,
the engine resets to ``"auto"`` for all subsequent turns so the
model can produce a final text answer.  Without this reset the
model would be forced to call tools on every turn, causing an
infinite loop until ``max_turns`` is exhausted.

When the model emits multiple tool calls in a single turn,
LazyBridge always executes them concurrently via ``asyncio.gather``.
That is a capability of the engine, not a configuration knob;
there is no "serial" execution path for LLM-emitted tool calls.

temperature: Sampling temperature. None = provider default. system: Static system prompt. Agent.sources= / Envelope.context are added on top. native_tools: Provider-native server-side tools, e.g. NativeTool.WEB_SEARCH. max_retries: Retries on transient provider errors (429, 5xx, network/timeout). Default 3 — production-safe. Pass 0 to disable. retry_delay: Base delay (seconds) for exponential backoff with ±10% jitter. request_timeout: Per-completion deadline in seconds. Caps the time a hung provider can block an agent run. None disables the framework-level timeout and defers to the provider SDK. max_parallel_tools: Maximum number of tool calls executed concurrently within a single model turn. Default is 8. None means unbounded — every tool call returned by the model runs in parallel. Set to a small integer (e.g. 4–8) to apply backpressure on wide tool fan-outs and prevent thread/socket/DB exhaustion on a single turn. max_tool_calls_per_turn: Maximum number of tool calls executed per model turn — distinct from max_parallel_tools (which only bounds how many run concurrently). None (default) executes every call the model emits. Set to 1 to keep a multi-agent graph on a single, non-branching path: the first call runs and any extras get an is_error result block telling the model only one call per turn is allowed, so it learns to emit fewer. See :class:~lazybridge.AgentPool and :func:~lazybridge.conclude. tool_timeout: Per-tool deadline in seconds. When set, each tool execution is wrapped in asyncio.wait_for. On timeout the tool's result is reported as is_error=True to the model loop so the model can recover; the run does not abort. None (default) leaves tools unbounded. stream_idle_timeout: Maximum time (seconds) the engine will wait between successive streaming chunks before raising StreamStallError. Catches half-open streams without killing legitimately long fast streams. Defaults to :data:DEFAULT_STREAM_IDLE_TIMEOUT (90.0 s). Pass an explicit None to disable stall detection — a one-shot UserWarning is emitted because a half-open provider stream then pins the worker indefinitely.

Source code in lazybridge/engines/llm.py
def __init__(
    self,
    model: str,
    *,
    provider: str | None = None,
    thinking: bool = False,
    max_turns: int = 20,
    tool_choice: Literal["auto", "any"] = "auto",
    temperature: float | None = None,
    system: str | None = None,
    native_tools: list[NativeTool | str] | None = None,
    allow_dangerous_native_tools: bool = False,
    max_retries: int = 3,
    retry_delay: float = 1.0,
    request_timeout: float | None = 120.0,
    max_parallel_tools: int | None = 8,
    max_tool_calls_per_turn: int | None = None,
    tool_timeout: float | None = None,
    stream_idle_timeout: float | None = _USE_DEFAULT_STREAM_IDLE,
    stream_buffer: int = 64,
    cache: bool | Any = False,
    strict_multimodal: bool = False,
) -> None:
    self.model = model
    self.thinking = thinking
    self.max_turns = max_turns
    self.max_retries = max_retries
    self.retry_delay = retry_delay
    self.request_timeout = request_timeout
    if max_parallel_tools is not None and max_parallel_tools < 1:
        raise ValueError(f"max_parallel_tools must be >= 1 or None, got {max_parallel_tools!r}")
    if max_tool_calls_per_turn is not None and max_tool_calls_per_turn < 1:
        raise ValueError(f"max_tool_calls_per_turn must be >= 1 or None, got {max_tool_calls_per_turn!r}")
    if tool_timeout is not None and tool_timeout <= 0:
        raise ValueError(f"tool_timeout must be > 0 or None, got {tool_timeout!r}")
    # ``stream_idle_timeout`` has three valid input shapes:
    #   * sentinel (caller did not specify) → use the safe default
    #   * positive float                    → custom timeout
    #   * explicit None                     → disabled, but warn loudly
    # Anything else (zero, negative) raises.
    if stream_idle_timeout is _USE_DEFAULT_STREAM_IDLE:
        stream_idle_timeout = DEFAULT_STREAM_IDLE_TIMEOUT
    elif stream_idle_timeout is None:
        warnings.warn(
            "LLMEngine(stream_idle_timeout=None) disables stream stall "
            "detection.  A half-open provider stream (TCP RST never "
            f"delivered, HTTP/2 PING dropped) will then pin a worker "
            f"indefinitely.  The safe default is "
            f"{DEFAULT_STREAM_IDLE_TIMEOUT}s — pass a positive float to "
            "tune it instead of disabling.",
            UserWarning,
            stacklevel=2,
        )
    elif stream_idle_timeout <= 0:
        raise ValueError(f"stream_idle_timeout must be > 0 or None, got {stream_idle_timeout!r}")
    if stream_buffer < 1:
        raise ValueError(f"stream_buffer must be >= 1, got {stream_buffer!r}")
    self.max_parallel_tools = max_parallel_tools
    self.max_tool_calls_per_turn = max_tool_calls_per_turn
    self.tool_timeout = tool_timeout
    self.stream_idle_timeout = stream_idle_timeout
    # Bounded queue between the streaming producer (provider) and
    # the consumer (the ``stream()`` async generator).  Pre-W4.1
    # the queue was unbounded, so a slow consumer (slow terminal,
    # slow network, blocked downstream) caused the queue to grow
    # without limit while the provider kept pushing tokens.  A
    # bounded queue propagates backpressure all the way to the
    # provider stream — when the consumer pauses, the producer
    # naturally pauses on ``await sink.put()``.
    self.stream_buffer = stream_buffer
    # ``tool_choice="parallel"`` was deprecated in 0.7.0 and removed in
    # 0.7.9.  Concurrent tool execution is the default and cannot be
    # disabled; the model decides how many tools to call per turn and
    # they run via asyncio.gather.  Defensive runtime check for the
    # JSON-deserialisation case (the Literal annotation already covers
    # static callers); cast through Any so mypy --strict doesn't flag
    # the comparison as non-overlapping.
    if cast(Any, tool_choice) == "parallel":
        raise ValueError(
            "LLMEngine(tool_choice='parallel') was removed in 0.7.9.  "
            "Concurrent tool execution is now the default and cannot be "
            "disabled; drop the argument (or use 'auto' / 'any')."
        )
    self.tool_choice = tool_choice
    self.temperature = temperature
    self.system = system
    # CODE_EXECUTION and COMPUTER_USE grant the model arbitrary code
    # execution / OS control, so they require an explicit opt-in.
    # FILE_SEARCH (OpenAI vector-store lookup) is intentionally NOT
    # gated here: it only reaches data the caller has already uploaded
    # to their own vector store and carries no ambient privilege.
    _DANGEROUS = {NativeTool.CODE_EXECUTION, NativeTool.COMPUTER_USE}
    resolved_native = [NativeTool(t) if isinstance(t, str) else t for t in (native_tools or [])]
    if not allow_dangerous_native_tools:
        found = [t for t in resolved_native if t in _DANGEROUS]
        if found:
            raise ValueError(
                f"Native tools {[t.value for t in found]!r} require explicit opt-in "
                f"because they grant the model code-execution or computer-control "
                f"capabilities. Pass allow_dangerous_native_tools=True to confirm."
            )
    self.native_tools: list[NativeTool] = resolved_native
    # Prompt caching — ``cache=True`` enables the default
    # (5-minute TTL on Anthropic; no-op on OpenAI / Google /
    # DeepSeek because they either cache automatically or need a
    # different API).  Callers wanting the 1-hour TTL pass a
    # ``CacheConfig(ttl="1h")`` object directly.
    from lazybridge.core.types import CacheConfig

    if cache is True:
        self.cache: CacheConfig | None = CacheConfig(enabled=True)
    elif cache is False or cache is None:
        self.cache = None
    else:
        self.cache = cache  # assumed CacheConfig
    # When True, ``Envelope.images`` / ``.audio`` reaching a model
    # that does not support that modality raises
    # ``UnsupportedFeatureError`` instead of warning-and-stripping.
    # Off by default so a single agent fleet can mix vision and
    # text-only models without crashing on edge cases.
    self.strict_multimodal = strict_multimodal
    # Provider may be passed explicitly (used by Agent.from_provider
    # when the model is a tier alias like "top" / "cheap" that
    # _infer_provider can't route on its own).  Falls back to the
    # inference heuristic on the model string.
    self.provider = provider or self._infer_provider(model)
    # Bare-provider-alias guard.  ``LLMEngine("anthropic")`` resolves
    # the provider correctly but leaves ``model="anthropic"`` as the
    # literal request payload, which every real API rejects several
    # RTTs later with a cryptic "unknown model" error.  The canonical
    # forms — ``LLMEngine("claude-opus-4-7")`` for a pinned SKU,
    # ``Agent.from_provider("anthropic", tier="medium")`` for a tier
    # alias — already cover both intents.  Only fires when ``provider=``
    # was inferred (not passed explicitly): ``Agent.from_provider``
    # passes ``provider="anthropic"`` alongside ``model="medium"``,
    # which is a legitimate tier-alias path and stays allowed.
    if provider is None and model in self._PROVIDER_ALIASES and self._PROVIDER_ALIASES[model] == model:
        raise ValueError(
            f"LLMEngine({model!r}) is ambiguous: {model!r} is a provider name, "
            f"not a model id.  At request time the provider would be asked for the "
            f"literal model {model!r} and reject it.\n"
            f"  Fix (tier alias — tracks the provider's lineup):\n"
            f'      Agent.from_provider({model!r}, tier="medium")\n'
            f"  Fix (pinned model id):\n"
            f'      Agent(engine=LLMEngine("<model-id>"))   # e.g. "claude-opus-4-7"'
        )

set_default_provider classmethod

set_default_provider(provider: str | None) -> None

Set (or disable) the fallback provider used when no rule matches.

Two common uses::

# Production hardening: raise on unknown models rather than
# silently falling back to the default. Recommended when you
# only ever want the providers you've explicitly registered.
LLMEngine.set_default_provider(None)

# Redirect the safety-net to a different built-in:
LLMEngine.set_default_provider("openai")

Why this helper exists: Agent("grok-2") would default-route to Anthropic and fail several RTTs later with a cryptic provider-side "unknown model" error. Disabling the fallback turns that into a loud ValueError at construction time.

Source code in lazybridge/engines/llm.py
@classmethod
def set_default_provider(cls, provider: str | None) -> None:
    """Set (or disable) the fallback provider used when no rule matches.

    Two common uses::

        # Production hardening: raise on unknown models rather than
        # silently falling back to the default. Recommended when you
        # only ever want the providers you've explicitly registered.
        LLMEngine.set_default_provider(None)

        # Redirect the safety-net to a different built-in:
        LLMEngine.set_default_provider("openai")

    Why this helper exists: ``Agent("grok-2")`` would default-route
    to Anthropic and fail several RTTs later with a cryptic
    provider-side "unknown model" error.  Disabling
    the fallback turns that into a loud ``ValueError`` at
    construction time.
    """
    cls._PROVIDER_DEFAULT = provider

provider_aliases classmethod

provider_aliases() -> dict[str, str]

Return a snapshot of the current model-string → provider alias map.

Callers that need to validate a user-supplied model string (or document the accepted aliases) should read from this dict rather than reach into _PROVIDER_ALIASES directly — the return value is a fresh copy, so accidental mutation can't affect the framework's routing. The set is also surfaced in :data:lazybridge.PROVIDER_ALIASES for top-level convenience.

Source code in lazybridge/engines/llm.py
@classmethod
def provider_aliases(cls) -> dict[str, str]:
    """Return a snapshot of the current model-string → provider alias map.

    Callers that need to validate a user-supplied model string (or
    document the accepted aliases) should read from this dict
    rather than reach into ``_PROVIDER_ALIASES`` directly — the
    return value is a fresh copy, so accidental mutation can't
    affect the framework's routing.  The set is also surfaced in
    :data:`lazybridge.PROVIDER_ALIASES` for top-level convenience.
    """
    return dict(cls._PROVIDER_ALIASES)

register_provider_alias classmethod

register_provider_alias(alias: str, provider: str) -> None

Register an exact-match model-string → provider alias.

Example::

LLMEngine.register_provider_alias("mistral", "mistral")
Agent("mistral")   # resolves to the mistral provider

Thread-safe: serialised via _PROVIDER_REGISTRY_LOCK so two threads racing here can't lose a registration to a read-then-write clobber on the class attribute.

Source code in lazybridge/engines/llm.py
@classmethod
def register_provider_alias(cls, alias: str, provider: str) -> None:
    """Register an exact-match model-string → provider alias.

    Example::

        LLMEngine.register_provider_alias("mistral", "mistral")
        Agent("mistral")   # resolves to the mistral provider

    Thread-safe: serialised via ``_PROVIDER_REGISTRY_LOCK`` so two
    threads racing here can't lose a registration to a read-then-write
    clobber on the class attribute.
    """
    with cls._PROVIDER_REGISTRY_LOCK:
        cls._PROVIDER_ALIASES = {**cls._PROVIDER_ALIASES, alias.lower(): provider}

register_provider_rule classmethod

register_provider_rule(pattern: str, provider: str, *, kind: Literal['contains', 'startswith'] = 'contains') -> None

Register a substring / prefix routing rule.

New rules take priority over built-ins so you can override default routing without editing the framework source::

LLMEngine.register_provider_rule("claude-opus-5", "anthropic")
Agent("claude-opus-5-20260701")   # routed to anthropic

Thread-safe: serialised via _PROVIDER_REGISTRY_LOCK so two threads racing here can't lose a rule to a read-then-write clobber on the class attribute.

Source code in lazybridge/engines/llm.py
@classmethod
def register_provider_rule(
    cls,
    pattern: str,
    provider: str,
    *,
    kind: Literal["contains", "startswith"] = "contains",
) -> None:
    """Register a substring / prefix routing rule.

    New rules take priority over built-ins so you can override default
    routing without editing the framework source::

        LLMEngine.register_provider_rule("claude-opus-5", "anthropic")
        Agent("claude-opus-5-20260701")   # routed to anthropic

    Thread-safe: serialised via ``_PROVIDER_REGISTRY_LOCK`` so two
    threads racing here can't lose a rule to a read-then-write
    clobber on the class attribute.
    """
    with cls._PROVIDER_REGISTRY_LOCK:
        cls._PROVIDER_RULES = [(kind, pattern.lower(), provider), *cls._PROVIDER_RULES]

stream async

stream(env: Envelope[Any], *, tools: list[Tool], output_type: type, memory: Memory | None, session: Session | None) -> AsyncGenerator[str, None]

Stream tokens from the full tool-calling loop.

Yields str tokens as the LLM generates them. Tool calls between turns are executed silently; the next-turn response is then streamed. This means token output is continuous across tool-call boundaries.

Source code in lazybridge/engines/llm.py
async def stream(
    self,
    env: Envelope[Any],
    *,
    tools: list[Tool],
    output_type: type,
    memory: Memory | None,
    session: Session | None,
) -> AsyncGenerator[str, None]:
    """Stream tokens from the full tool-calling loop.

    Yields str tokens as the LLM generates them. Tool calls between turns
    are executed silently; the next-turn response is then streamed.
    This means token output is continuous across tool-call boundaries.
    """
    run_id = str(uuid.uuid4())
    agent_name = getattr(self, "_agent_name", "agent")

    if session:
        session.emit(EventType.AGENT_START, {"agent_name": agent_name, "task": env.task}, run_id=run_id)

    # Bounded sink — see ``stream_buffer`` on ``__init__``.  The
    # producer ``await sink.put(token)`` naturally blocks when the
    # consumer falls behind; this is the only mechanism that keeps
    # an idle consumer from forcing unbounded memory growth as the
    # provider streams tokens.
    sink: asyncio.Queue[str | None] = asyncio.Queue(maxsize=self.stream_buffer)

    async def _run_loop() -> None:
        try:
            await self._loop(
                env,
                tools=tools,
                output_type=output_type,
                memory=memory,
                session=session,
                run_id=run_id,
                _stream_sink=sink,
            )
        finally:
            await sink.put(None)  # sentinel — loop done

    task = asyncio.create_task(_run_loop())
    cancelled_by_us = False
    try:
        while True:
            token = await sink.get()
            if token is None:
                break
            yield token
    finally:
        # If the consumer broke early (e.g. ``break`` out of the
        # ``async for``), cancel the background loop instead of
        # awaiting it — otherwise the provider keeps streaming
        # into a sink no one is reading, racking up cost and
        # tying up worker capacity for the lifetime of the turn.
        if not task.done():
            task.cancel()
            cancelled_by_us = True
        try:
            await task
        except asyncio.CancelledError:
            if not cancelled_by_us:
                raise
        # Emit AGENT_FINISH regardless of how we exited so streaming
        # runs are observable end-to-end the same way ``run()``
        # invocations are.  The companion AGENT_START is emitted at
        # the top of this method.
        if session:
            session.emit(
                EventType.AGENT_FINISH,
                {"agent_name": agent_name, "cancelled": cancelled_by_us},
                run_id=run_id,
            )

Plan + Step

lazybridge.Plan

Plan(*steps: Step, max_iterations: int = 100, store: Store | None = None, checkpoint_key: str | None = None, resume: bool = False, on_concurrent: Literal['fail', 'fork'] = 'fail')

Structured multi-step execution engine.

Steps run sequentially by default. Routing is explicit at the Step level via Step(routes={...}) (predicate map) or Step(routes_by="field") (LLM-decided via a Literal field on the structured output). Parallel branches via step.parallel=True.

PlanCompiler runs at Agent construction time; errors surface before any LLM call.

Construct a Plan.

Checkpoint / resume

Pass store= and checkpoint_key= to persist minimal plan state (next step to run, writes-bucket values, completed step names) after every step. Pass resume=True together with a populated store[checkpoint_key] to pick up where the previous run stopped (useful after a crash, interrupt, or external pause).

Example::

store = Store(db="run.sqlite")
plan = Plan(
    Step(researcher, writes="research"),
    Step(writer, writes="draft"),
    store=store,
    checkpoint_key="my_pipeline",
    resume=True,
)
Agent(engine=plan)("topic")   # continues if a checkpoint exists

The persisted shape (v2) includes minimal plan state plus serialized StepResult history: {"next_step": str, "kv": {...}, "completed_steps": [...], "status": str, "run_uid": str, "history": [...]}. History is serialized so a resumed run can re-aggregate from_parallel_all bands and nested-cost rollup against completed upstream steps. Only writes-bucket values and step history survive across process boundaries; live in-memory state does not.

Crash-window durability

Each step writes its checkpoint before the durable store.write(step.writes, value) call. This eliminates double-writes on resume — the checkpoint already records next_step as the following step, so a resumed run does not re-execute the completed step. The trade-off is that a crash in the gap between the checkpoint and the Store write makes the durable Store write lost; the value still lives in the checkpoint's serialised kv and is read back into in-memory state on resume, so the Plan continues correctly, but sidecar consumers reading the Store directly should reconcile against the checkpoint snapshot rather than assume Store completeness.

Concurrency

Every checkpoint write goes through :meth:lazybridge.store.Store.compare_and_swap, so two Plan runs can never silently overwrite each other's state. Two policies are available via on_concurrent=:

  • "fail" (default) — checkpoint_key identifies a single in-flight run. A second Plan on the same key, while the first is still running, raises :class:ConcurrentPlanRunError. This is the correctness floor; pick it when runs legitimately share state (e.g. graceful crash-resume via resume=True).

  • "fork"checkpoint_key names the pipeline; each .run() claims its own isolated effective key f"{checkpoint_key}:{run_uid}". Many runs of the same pipeline can execute concurrently with no collision. This is the mode you want for fan-out workflows (N backtests / seeds / tickers sharing a pipeline definition). resume is not supported in fork mode because there is no single shared checkpoint to resume — if you need resume, use on_concurrent="fail" with distinct per-run keys.

Example::

store = Store(db="run.sqlite")
plan = Plan(
    Step(researcher, writes="research"),
    Step(writer, writes="draft"),
    store=store,
    checkpoint_key="my_pipeline",
    resume=True,
)
Source code in lazybridge/engines/plan/_plan.py
def __init__(
    self,
    *steps: Step,
    max_iterations: int = 100,
    store: Store | None = None,
    checkpoint_key: str | None = None,
    resume: bool = False,
    on_concurrent: Literal["fail", "fork"] = "fail",
) -> None:
    """Construct a Plan.

    Checkpoint / resume
    -------------------
    Pass ``store=`` and ``checkpoint_key=`` to persist minimal plan state
    (next step to run, ``writes``-bucket values, completed step names)
    after every step. Pass ``resume=True`` together with a populated
    ``store[checkpoint_key]`` to pick up where the previous run stopped
    (useful after a crash, interrupt, or external pause).

    Example::

        store = Store(db="run.sqlite")
        plan = Plan(
            Step(researcher, writes="research"),
            Step(writer, writes="draft"),
            store=store,
            checkpoint_key="my_pipeline",
            resume=True,
        )
        Agent(engine=plan)("topic")   # continues if a checkpoint exists

    The persisted shape (v2) includes minimal plan state plus serialized
    StepResult history: ``{"next_step": str, "kv": {...},
    "completed_steps": [...], "status": str, "run_uid": str,
    "history": [...]}``.
    History is serialized so a resumed run can re-aggregate
    ``from_parallel_all`` bands and nested-cost rollup against completed
    upstream steps.  Only ``writes``-bucket values and step history
    survive across process boundaries; live in-memory state does not.

    Crash-window durability
    -----------------------
    Each step writes its checkpoint *before* the durable
    ``store.write(step.writes, value)`` call.  This eliminates
    double-writes on resume — the checkpoint already records
    ``next_step`` as the following step, so a resumed run does not
    re-execute the completed step.  The trade-off is that a crash in
    the gap between the checkpoint and the Store write makes the
    durable Store write *lost*; the value still lives in the
    checkpoint's serialised ``kv`` and is read back into in-memory
    state on resume, so the Plan continues correctly, but **sidecar
    consumers reading the Store directly should reconcile against
    the checkpoint snapshot rather than assume Store completeness**.

    Concurrency
    -----------
    Every checkpoint write goes through
    :meth:`lazybridge.store.Store.compare_and_swap`, so two Plan
    runs can never silently overwrite each other's state.  Two
    policies are available via ``on_concurrent=``:

    * ``"fail"`` (default) — ``checkpoint_key`` identifies a single
      in-flight run.  A second Plan on the same key, while the first
      is still running, raises :class:`ConcurrentPlanRunError`.
      This is the correctness floor; pick it when runs legitimately
      share state (e.g. graceful crash-resume via ``resume=True``).

    * ``"fork"`` — ``checkpoint_key`` names the *pipeline*; each
      ``.run()`` claims its own isolated effective key
      ``f"{checkpoint_key}:{run_uid}"``.  Many runs of the same
      pipeline can execute concurrently with no collision.  This
      is the mode you want for fan-out workflows (N backtests /
      seeds / tickers sharing a pipeline definition).  ``resume``
      is not supported in ``fork`` mode because there is no single
      shared checkpoint to resume — if you need resume, use
      ``on_concurrent="fail"`` with distinct per-run keys.

    Example::

        store = Store(db="run.sqlite")
        plan = Plan(
            Step(researcher, writes="research"),
            Step(writer, writes="draft"),
            store=store,
            checkpoint_key="my_pipeline",
            resume=True,
        )
    """
    if on_concurrent not in ("fail", "fork"):
        raise ValueError(
            f"Plan(on_concurrent={on_concurrent!r}): must be one of "
            f"'fail' (default, raise on collision) or 'fork' (isolate "
            f"each run under a suffixed key)."
        )
    if on_concurrent == "fork" and resume:
        raise ValueError(
            "Plan(on_concurrent='fork', resume=True) is not supported: "
            "'fork' gives each run its own key, so there is no single "
            "shared checkpoint to resume from.  Use on_concurrent='fail' "
            "with a unique per-run checkpoint_key if you need resume."
        )
    self.steps = list(steps)
    self.max_iterations = max_iterations
    self._compiler = PlanCompiler()
    self.store = store
    self.checkpoint_key = checkpoint_key
    self.resume = resume
    self.on_concurrent = on_concurrent

run_many

run_many(tasks: list[str | Envelope[Any]], *, concurrency: int | None = None, tools: list[Any] | None = None, memory: Any = None, session: Any = None, output_type: type = str) -> list[Envelope[Any]]

Run this Plan concurrently against N inputs; sync return.

Each task is dispatched as its own Plan.run invocation on a fresh asyncio task; results are returned as a list in input order. Pair with Plan(on_concurrent="fork", ...) for true fan-out workflows where each input claims its own per-run keyspace.

Errors are returned as error envelopes in the corresponding slot — the call never raises (matches Agent.parallel semantics).

concurrency caps the number of in-flight runs via an asyncio semaphore. None (default) lets every task fire immediately.

Pass tools when the Plan's steps use string-name targets that must be resolved against a live tool map. Omitting tools (or passing []) works only when every step target is an Agent object rather than a string alias.

See :meth:arun_many for the async variant when the caller is already inside an event loop.

Source code in lazybridge/engines/plan/_plan.py
def run_many(
    self,
    tasks: list[str | Envelope[Any]],
    *,
    concurrency: int | None = None,
    tools: list[Any] | None = None,
    memory: Any = None,
    session: Any = None,
    output_type: type = str,
) -> list[Envelope[Any]]:
    """Run this Plan concurrently against ``N`` inputs; sync return.

    Each ``task`` is dispatched as its own ``Plan.run`` invocation
    on a fresh asyncio task; results are returned as a list in
    input order.  Pair with ``Plan(on_concurrent="fork", ...)`` for
    true fan-out workflows where each input claims its own
    per-run keyspace.

    Errors are returned as error envelopes in the corresponding
    slot — the call never raises (matches ``Agent.parallel``
    semantics).

    ``concurrency`` caps the number of in-flight runs via an
    asyncio semaphore.  ``None`` (default) lets every task fire
    immediately.

    Pass ``tools`` when the Plan's steps use string-name targets that
    must be resolved against a live tool map.  Omitting ``tools``
    (or passing ``[]``) works only when every step target is an
    ``Agent`` object rather than a string alias.

    See :meth:`arun_many` for the async variant when the caller is
    already inside an event loop.
    """
    # Re-use the sync-bridge that ``Agent.__call__`` ships with —
    # it propagates contextvars (OTel spans, request ids, …) into
    # the worker loop so observability flows through fan-outs.
    from lazybridge.agent import _run_coro_with_context

    result: list[Envelope[Any]] = _run_coro_with_context(
        self.arun_many(
            tasks,
            concurrency=concurrency,
            tools=tools,
            memory=memory,
            session=session,
            output_type=output_type,
        )
    )
    return result

arun_many async

arun_many(tasks: list[str | Envelope[Any]], *, concurrency: int | None = None, tools: list[Any] | None = None, memory: Any = None, session: Any = None, output_type: type = str) -> list[Envelope[Any]]

Async counterpart to :meth:run_many.

Use this directly when you're already inside an event loop and want to await the fan-out without the sync-bridge overhead.

Pass tools when the Plan's steps use string-name targets that must be resolved against a live tool map. Omitting tools (or passing []) works only when every step target is an Agent object rather than a string alias.

Source code in lazybridge/engines/plan/_plan.py
async def arun_many(
    self,
    tasks: list[str | Envelope[Any]],
    *,
    concurrency: int | None = None,
    tools: list[Any] | None = None,
    memory: Any = None,
    session: Any = None,
    output_type: type = str,
) -> list[Envelope[Any]]:
    """Async counterpart to :meth:`run_many`.

    Use this directly when you're already inside an event loop and
    want to ``await`` the fan-out without the sync-bridge overhead.

    Pass ``tools`` when the Plan's steps use string-name targets that
    must be resolved against a live tool map.  Omitting ``tools``
    (or passing ``[]``) works only when every step target is an
    ``Agent`` object rather than a string alias.
    """
    sem = asyncio.Semaphore(concurrency) if concurrency else None
    resolved_tools: list[Any] = tools or []

    async def _one(task: str | Envelope[Any]) -> Envelope[Any]:
        # ``Envelope.from_task`` populates BOTH ``task`` and
        # ``payload`` so the first step's ``from_prev`` resolves to
        # the user's input rather than an empty string.
        env = task if isinstance(task, Envelope) else Envelope.from_task(str(task))

        async def _go() -> Envelope[Any]:
            return await self.run(
                env,
                tools=resolved_tools,
                output_type=output_type,
                memory=memory,
                session=session,
            )

        if sem is None:
            return await _go()
        async with sem:
            return await _go()

    raw = await asyncio.gather(
        *[_one(t) for t in tasks],
        return_exceptions=True,
    )
    # Wrap raised exceptions as error envelopes so the contract is
    # "list of envelopes in input order".  Plan.run normally
    # returns an error envelope itself, so this branch only fires
    # for genuine framework bugs / cancellations.
    return [
        r
        if isinstance(r, Envelope)
        else Envelope.error_envelope(r if isinstance(r, BaseException) else RuntimeError(str(r)))
        for r in raw
    ]

to_dict

to_dict() -> dict[str, Any]

Serialise the Plan's topology to a JSON-compatible dict.

Callables and Agents are serialised by name only — rebind them at load time via :meth:from_dict's registry kwarg. Sentinels, writes, parallel flags, iteration limit, and step order are preserved faithfully.

Source code in lazybridge/engines/plan/_plan.py
def to_dict(self) -> dict[str, Any]:
    """Serialise the Plan's topology to a JSON-compatible dict.

    Callables and Agents are serialised by ``name`` only — rebind
    them at load time via :meth:`from_dict`'s ``registry`` kwarg.
    Sentinels, writes, parallel flags, iteration limit, and step
    order are preserved faithfully.
    """
    return {
        "version": 1,
        "max_iterations": self.max_iterations,
        "steps": [_step_to_dict(s) for s in self.steps],
    }

from_dict classmethod

from_dict(data: dict[str, Any], *, registry: dict[str, Any] | None = None) -> Plan

Reconstruct a Plan from a to_dict payload.

registry maps serialised target names back to live callables / Agents. Missing entries for non-tool targets raise :class:KeyError with the offending name — keeping the failure loud rather than producing a silently-broken Plan.

Example::

saved = plan.to_dict()                     # store somewhere
...
plan = Plan.from_dict(saved, registry={
    "researcher": researcher_agent,
    "fetch":      fetch_function,
})
Source code in lazybridge/engines/plan/_plan.py
@classmethod
def from_dict(
    cls,
    data: dict[str, Any],
    *,
    registry: dict[str, Any] | None = None,
) -> Plan:
    """Reconstruct a Plan from a ``to_dict`` payload.

    ``registry`` maps serialised target names back to live callables /
    Agents.  Missing entries for non-tool targets raise
    :class:`KeyError` with the offending name — keeping the failure
    loud rather than producing a silently-broken Plan.

    Example::

        saved = plan.to_dict()                     # store somewhere
        ...
        plan = Plan.from_dict(saved, registry={
            "researcher": researcher_agent,
            "fetch":      fetch_function,
        })
    """
    registry = registry or {}
    steps = [_step_from_dict(s, registry) for s in data.get("steps", [])]
    return cls(*steps, max_iterations=data.get("max_iterations", 100))

lazybridge.Step dataclass

Step(target: Any, task: Sentinel | str = (lambda: from_prev)(), context: Sentinel | str | list[Sentinel | str] | None = None, sources: list[Any] = list(), writes: str | None = None, input: type = Any, output: type = str, parallel: bool = False, name: str | None = None, routes: dict[str, Callable[[Any], bool]] | None = None, routes_by: str | None = None, after_branches: str | None = None, _name_is_opaque: bool = False)

A single node in a Plan.

target and name are two distinct concepts:

  • targetwhich tool to call. When a string, it must match the key registered in the parent Agent's tool map, i.e. the name passed to agent.as_tool("name"). This is the link between the Plan and the tools list::

    researcher.as_tool("research") → tool map key: "research" Step("research") → target="research", calls that tool Step("research", name="phase1") → calls "research" tool, step is "phase1"

When target is a string and name is omitted, name defaults to target — so in the common case they are the same. They only diverge when you want a display name or routing key that differs from the tool name.

  • namethe step's identity in the plan. Used for routing (routes={"name": predicate}), sentinel lookups (from_step("name")), checkpointing, and display. If routing breaks silently, check that the target name in routes= matches the name of the intended step, not its target.

Parameters:

Name Type Description Default
target Any

Tool name (str), callable, or Agent. Required.

required
task Sentinel | str

Sentinel or str for the step's task. Default: from_prev.

(lambda: from_prev)()
context Sentinel | str | list[Sentinel | str] | None

Sentinel, str, or list of either for extra context. A list joins its resolved parts with blank-line separators (same shape as sources) so a step can pull data from multiple upstream steps without an intermediate combiner. Each list item is validated independently at compile time. Default: none.

None
sources list[Any]

Live-view objects with a .text() method injected into context.

list()
writes str | None

Key under which Envelope.payload is saved in the Store.

None
input type

Expected input payload type (PlanCompiler validates).

Any
output type

Expected output payload type (triggers structured output).

str
parallel bool

True if this step runs concurrently with siblings.

False
name str | None

The step's identity for routing, sentinels, and display. Defaults to target when target is a string.

None
routes dict[str, Callable[[Any], bool]] | None

Predicate-based routing. Mapping {step_name: predicate(envelope) -> bool}. After this step runs, predicates are evaluated in declared order; the first one that returns truthy makes the Plan jump to the corresponding step. If none match (or routes is None), execution falls through linearly to the next declared step. Mutually exclusive with routes_by.

None
routes_by str | None

LLM-decided routing via a named field on the step's structured output. Pass the attribute name (e.g. "kind") — Plan reads env.payload.<name> and, if it's a string matching an existing step name, jumps there. The output model must declare that field as Literal["a", "b", ...] (or Literal[...] | None); compile-time validation rejects values that don't match a step name. Mutually exclusive with routes.

None
after_branches str | None

Exclusive-branch rejoin point. Only valid alongside routes or routes_by. When set, exactly one branch runs (the routed-to step), all other declared steps between the routing step and the rejoin point are skipped, and execution continues at the named step after the branch completes.

 Example::

     Step("triage", agent, routes_by="severity",
          after_branches="archive"),
     Step("urgent", urgent_agent),
     Step("normal", normal_agent),
     Step("spam",   spam_agent),
     Step("archive", archive_agent),   # always runs

 Without ``after_branches``, routing is a *detour*:
 after the routed-to step, linear progression resumes
 from its declared position, so all subsequent steps
 also execute.  ``after_branches`` replaces that
 fall-through with a guaranteed jump to the rejoin
 point.  For multi-step branches, pass an
 ``Agent(engine=Plan(...))`` as the branch step's
 target.
None

Without after_branches, routing is a detour: after the routed-to step runs, linear progression resumes from its declared position. To make a step terminal, place it at the end of the declared step list. Loops are simply routes back to an earlier step; Plan(max_iterations=...) is the safety net.

ReplanEngine

The adaptive replan-loop engine. A planner tool (built with output=PlanRound) is called every round; ReplanEngine dispatches the tasks it emits and checkpoints after each round. See Guides → Full → ReplanEngine for narrative usage.

lazybridge.ReplanEngine

ReplanEngine(*, planner_name: str = 'planner', store: Store | None = None, checkpoint_key: str | None = None, resume: bool = False, max_rounds: int = 20)

Engine that guards the dynamic replan loop with checkpoint/resume.

The planner and all worker tools are resolved from the tool_map at run time — nothing is injected at construction except configuration::

guardian = Agent(
    engine=ReplanEngine(
        store=Store(db="project.sqlite"),
        checkpoint_key="my-project",
        resume=True,
    ),
    tools=[planner, analyst, coder, pool.as_tool("route")],
)

The planner tool must have output=PlanRound. ReplanEngine builds the planner's input dynamically (tool schemas + history) so the planner does not need a static system prompt that lists worker names.

See :class:lazybridge.engines.replan.PlanRound and :class:lazybridge.engines.replan.Task for the planner output schema.

Source code in lazybridge/engines/replan/_engine.py
def __init__(
    self,
    *,
    planner_name: str = "planner",
    store: Store | None = None,
    checkpoint_key: str | None = None,
    resume: bool = False,
    max_rounds: int = 20,
) -> None:
    self.planner_name = planner_name
    self.store = store
    self.checkpoint_key = checkpoint_key
    self.resume = resume
    self.max_rounds = max_rounds

lazybridge.PlanRound

Bases: BaseModel

Structured output emitted by the planner agent each round.

ReplanEngine calls the planner tool, deserialises this schema, and dispatches the tasks. Tasks within the same round with parallel=True run concurrently via asyncio.gather; parallel=False tasks run sequentially after the parallel group.

Dependent tasks belong in the next round — after the planner has seen the outputs from this one.

lazybridge.Task

Bases: BaseModel

A single tool call planned for this round.

tool must match a key in the parent Agent's tool_map — an agent, a plain function, a pool route, or any other callable wrapped as a Tool. kwargs are forwarded verbatim to tool.run(**kwargs) so they must match the tool's JSON schema exactly.

Examples::

Task(tool="analyst",  kwargs={"task": "analyse the auth module"})
Task(tool="route",    kwargs={"agent_name": "alice", "task": "write tests"})
Task(tool="add",      kwargs={"a": 3, "b": 7})

Engine errors

lazybridge.PlanCompileError

Bases: Exception

lazybridge.PlanRuntimeError

Bases: RuntimeError

Raised when a Plan step misbehaves at runtime in a way that indicates a programming bug — not a recoverable runtime condition.

The canonical case is a Step(routes={...}) predicate that raises an exception during evaluation. The engine wraps the underlying exception in :class:PlanRuntimeError with the offending step and target named, then propagates.

Distinct from :class:PlanCompileError (which fires at construction time for static DAG validation) and from :class:ConcurrentPlanRunError (which fires at runtime CAS collision). Distinct from regular Envelope.error_envelope return paths (which signal recoverable runtime failures the caller can handle without a try/except).

lazybridge.PlanPaused

PlanPaused(message: str = 'Plan paused')

Bases: BaseException

Raised by a step target to halt the Plan and persist a resumable checkpoint.

Subclasses :class:BaseException (not :class:Exception) so user code's except Exception clauses do not accidentally swallow the pause signal — same pattern as :class:KeyboardInterrupt and :class:SystemExit.

The engine catches PlanPaused after the offending step's invocation:

  1. Writes a checkpoint with status="paused" and next_step pointing at the same step (so a future resume=True run re-invokes the step rather than skipping past it).
  2. Returns an Envelope whose error is ErrorInfo(type="PlanPaused", retryable=True, ...) so the caller can detect the pause and arrange for the resume.

Use this when a step needs to halt the pipeline cooperatively — for example, the step has detected that an external precondition isn't met (webhook hasn't arrived, human approval pending) and the rest of the pipeline cannot proceed yet.

Example::

from lazybridge.engines.plan import PlanPaused

def webhook_step(task: str) -> str:
    if not webhook_payload_available():
        raise PlanPaused("waiting for webhook delivery")
    return process(task)

Resume::

# Same Plan, resume=True picks up at the paused step.
Agent(
    engine=Plan(*steps,
                store=store,
                checkpoint_key="run-42",
                resume=True),
    tools=[...],
)("…")
Source code in lazybridge/engines/plan/_types.py
def __init__(self, message: str = "Plan paused") -> None:
    super().__init__(message)
    self.message = message

lazybridge.ConcurrentPlanRunError

Bases: RuntimeError

Raised when two Plan runs race for the same checkpoint_key.

Checkpoints are serialised through :meth:lazybridge.store.Store.compare_and_swap so the first writer wins and any second writer fails fast instead of silently overwriting the first run's state. Derive a unique checkpoint_key per run (e.g. f"pipeline-{uuid.uuid4().hex}") when you need concurrent execution on the same :class:Store.

lazybridge.ToolTimeoutError

Bases: Exception

Raised when a tool exceeds LLMEngine.tool_timeout.

The engine catches this internally and reports the failure to the model loop as ToolResultContent(is_error=True) so the model can recover; it does not abort the agent run.

lazybridge.StreamStallError

Bases: Exception

Raised when a streaming response goes idle past stream_idle_timeout.

Distinct from request_timeout (total deadline) — this fires when the time between successive chunks exceeds the threshold, catching half-open streams and partial provider outages without killing fast streams that legitimately take a long time end-to-end.