Agent builds a plan¶
A planning agent emits a structured PlanSpec (Pydantic) that's
materialised into a live Plan with typed steps. Demonstrates the
"Plan as data" pattern — the LLM decides the topology, the runtime
validates it at construction (compile-time DAG checks), then
executes deterministically.
Supports a one-shot mode and a re-planning loop.
Source¶
"""Planner agent emits a typed PlanSpec; we materialise it into a real Plan.
Why this is the cleanest "planner builds a plan" pattern in LazyBridge
---------------------------------------------------------------------
- The planner returns a ``PlanSpec`` (Pydantic). No free-form text to parse.
- ``materialize()`` turns the spec into a real ``Plan(Step(...), ...)`` with
live agents bound to step targets. ``PlanCompiler`` validates the DAG when
the Plan is wrapped by ``Agent(engine=plan)`` — broken DAGs (forward
``from_step`` references, unknown step names, duplicates) surface as
``PlanCompileError`` *before* any LLM call.
- ``Step(parallel=True)`` runs siblings concurrently. No asyncio loop in user
code: ``Plan`` does the dispatch.
- Re-planning is opt-in (``solve(replan=True)``): after the Plan finishes, the
planner sees the result and either declares done or builds a new Plan.
When to prefer the "asyncio loop" pattern in ``dynamic_planner.py`` instead:
when you need to feed *partial* results back mid-round (the planner inspects
results before the round finishes). Plan-based execution is round-atomic.
"""
from typing import Literal
from pydantic import BaseModel, Field
from lazybridge import Agent, LLMEngine, Plan, Step, from_parallel, from_prev, from_step
# ---------------------------------------------------------------------------
# 1. Sub-agents (the registry the planner is allowed to pick from)
# ---------------------------------------------------------------------------
def web_search(query: str) -> str:
"""Look up current facts (stub)."""
return f"[stub web result for {query!r}]"
def add(a: float, b: float) -> float:
"""Add two numbers."""
return a + b
def multiply(a: float, b: float) -> float:
"""Multiply two numbers."""
return a * b
research_agent = Agent(
engine=LLMEngine("gpt-5.4-mini", system="Look up facts via web_search."),
tools=[web_search],
name="research",
)
math_agent = Agent(
engine=LLMEngine("gpt-5.4-mini", system="Solve arithmetic with add/multiply."),
tools=[add, multiply],
name="math",
)
writer_agent = Agent(
engine=LLMEngine("gpt-5.4-mini", system="Synthesise prior results into prose."),
name="writer",
)
REGISTRY: dict[str, Agent] = {
"research": research_agent,
"math": math_agent,
"writer": writer_agent,
}
# ---------------------------------------------------------------------------
# 2. PlanSpec — what the planner emits (structured output)
# ---------------------------------------------------------------------------
class StepSpec(BaseModel):
name: str = Field(..., description="Unique step name; used by from_step references.")
agent: Literal["research", "math", "writer"] = Field(..., description="Which sub-agent runs this step.")
task_kind: Literal["literal", "from_prev", "from_step", "from_parallel"] = Field(
default="from_prev",
description=(
"How this step receives its task: literal=use task_text, "
"from_prev=output of previous step, from_step=output of named step, "
"from_parallel=list of envelopes from a parallel sibling group."
),
)
task_text: str | None = Field(default=None, description="Required when task_kind='literal'.")
task_step: str | None = Field(
default=None,
description="Required when task_kind='from_step' or 'from_parallel'.",
)
parallel: bool = Field(default=False, description="If True, run concurrently with adjacent parallel siblings.")
class PlanSpec(BaseModel):
reasoning: str = Field(..., description="Why this DAG was chosen.")
steps: list[StepSpec] = Field(default_factory=list, description="DAG steps in order.")
done: bool = Field(default=False, description="Set True with no steps to short-circuit.")
final_answer: str | None = Field(default=None, description="If done=True, the user-facing answer.")
PLANNER_SYSTEM = f"""\
You are a planner that produces a small DAG of work.
Available sub-agents (pick exactly one per step):
- research : web lookups. Cannot do math.
- math : arithmetic only.
- writer : synthesise prior results into prose. Adds no new facts.
You emit a PlanSpec with ordered steps. Each step has:
- name : unique short identifier (snake_case)
- agent : one of {sorted(REGISTRY)}
- task_kind : 'literal' (provide task_text), 'from_prev' (default — uses
previous step's output as task), 'from_step' (uses named
step's output; provide task_step), 'from_parallel' (gather
list of envelopes from a parallel sibling group)
- parallel : True to run concurrently with adjacent parallel siblings
Rules:
1. The first step must use task_kind='literal' (no previous step exists).
2. To fan out, mark several adjacent steps parallel=True; their join step
should use task_kind='from_parallel' and task_step=<name of first parallel sibling>.
3. If the question can be answered without any work, set done=true and put
the answer in final_answer (leave steps empty).
4. End the DAG with a 'writer' step when prose is required.
"""
planner = Agent(
engine=LLMEngine("gpt-5.4-mini", system=PLANNER_SYSTEM),
output=PlanSpec,
name="planner",
)
# ---------------------------------------------------------------------------
# 3. Materialise a PlanSpec into a real Plan (compile-time validation)
# ---------------------------------------------------------------------------
def materialize(spec: PlanSpec) -> Plan:
steps: list[Step] = []
for s in spec.steps:
if s.task_kind == "literal":
if not s.task_text:
raise ValueError(f"step {s.name!r}: task_kind='literal' needs task_text")
task = s.task_text
elif s.task_kind == "from_step":
if not s.task_step:
raise ValueError(f"step {s.name!r}: task_kind='from_step' needs task_step")
task = from_step(s.task_step)
elif s.task_kind == "from_parallel":
if not s.task_step:
raise ValueError(f"step {s.name!r}: task_kind='from_parallel' needs task_step")
task = from_parallel(s.task_step)
else: # from_prev
task = from_prev
steps.append(
Step(
target=REGISTRY[s.agent],
name=s.name,
task=task,
parallel=s.parallel,
)
)
# Plan() runs PlanCompiler — bad references raise PlanCompileError here.
return Plan(*steps, max_iterations=max(20, len(steps) * 3))
# ---------------------------------------------------------------------------
# 4. solve() — one-shot or replan
# ---------------------------------------------------------------------------
def _format_with_history(query: str, history: list[str]) -> str:
if not history:
return query
rounds = "\n\n".join(f"--- prior round {i + 1} ---\n{h}" for i, h in enumerate(history))
return f"User query: {query}\n\nPrior plan results:\n{rounds}\n\nDecide next plan or set done=true."
def solve(query: str, *, replan: bool = False, max_rounds: int = 5) -> str:
"""Execute one Plan. If replan=True, loop until the planner sets done=true."""
history: list[str] = []
for round_num in range(1, max_rounds + 1):
env = planner(_format_with_history(query, history))
spec: PlanSpec | None = env.payload
if spec is None:
# Typical when ``ANTHROPIC_API_KEY`` is missing — the
# request errored and the typed payload is None.
err = env.error or RuntimeError("planner returned no payload")
return f"planner unavailable: {type(err).__name__}: {err}"
print(f"\n=== plan round {round_num} ===")
print("reasoning:", spec.reasoning)
if spec.done:
print("planner: DONE")
return spec.final_answer or (history[-1] if history else "")
if not spec.steps:
return spec.final_answer or "planner emitted empty plan; aborting"
plan = materialize(spec) # ← compile-time validated
print(f"materialised plan with {len(spec.steps)} step(s); running…")
result = Agent(engine=plan)(query).text()
history.append(result)
if not replan:
return result
return f"max_rounds reached; partial: {history[-1] if history else ''}"
# ---------------------------------------------------------------------------
# 5. Entry point
# ---------------------------------------------------------------------------
def main() -> None:
query = (
"Get the headcounts of Apple, Google, and Meta in 2024 (you can look "
"these up in parallel). Then compute the total. Finally, write a "
"short paragraph commenting on the total."
)
answer = solve(query, replan=False) # set replan=True for adaptive multi-round
print("\n=== FINAL ANSWER ===\n" + answer)
if __name__ == "__main__":
main()
Walkthrough¶
PlanSpecis a Pydantic schema — the planning agent emits a JSON-serialisable spec (steps, sentinels, parallel flags) and a builder function compiles it into a realPlan.PlanCompilercatches dangling references / unknown targets at construction, before any LLM call runs.- Re-planning loop: when
replan=True, the planner sees the results of the previous round and emits a newPlanSpec. Bound the loop with a counter or a "done" predicate — there's no built-in safety net beyondPlan(max_iterations=...). - Sentinels in the spec (
from_prev,from_step("name"),from_parallel("name")) translate from JSON strings back into the Python sentinel objects.
Variations¶
- Drop the re-planning loop and run the materialised plan once — cheaper, deterministic.
- Persist the
PlanSpec(json.dumps(spec.model_dump())) for audit / replay. See Plan serialization for the runtime side. - Constrain the planner's choice of targets via the schema
(
Literal[...]field for steptarget) so the LLM can't fabricate non-existent agents.
See also¶
- Plan — the runtime side.
- Plan serialization — for persisting the spec across processes.
- Dynamic re-planning — sibling pattern using a flat round-of-tasks shape.
- Plan tool — the pre-built factory alternative.