Dynamic re-planning¶
A planner that emits one round of independent tasks at a time, runs them in parallel, sees the results, then emits the next round. Adaptive: each round is informed by the previous one. The LangGraph-equivalent shape is "ReAct-on-tasks", but the planning unit is a batch of tasks rather than a single tool call.
Source¶
"""Dynamic planner with re-planning, parallel execution, and checkpoint/resume.
Pattern
-------
A planner agent reasons about the user's query and emits a *round* of
independent tasks (run in parallel). After each round it sees the results
and either emits another round or declares the work done. This is "ReAct
on tasks" — the planner re-plans every round, adapting to intermediate
findings instead of committing to a fixed task list up front.
Why not :class:`lazybridge.Plan`?
``Plan`` is compiled at construction time — its DAG is fixed. This file
targets the case where the *shape* of the work depends on the query and
on intermediate results.
Why :class:`lazybridge.ReplanEngine` instead of a raw Python loop?
``ReplanEngine`` is the guardian: it checkpoints after every round so a
restart continues from the correct round rather than re-executing completed
work. Pass ``store=`` and ``checkpoint_key=`` to enable persistence.
Architecture (LazyBridge "everything is a tool")
-------------------------------------------------
The planner and all workers are tools in the guardian Agent's tool_map:
guardian.tools = [planner, research_agent, math_agent, writer_agent]
↑ (output=PlanRound) ↑ workers dispatched by ReplanEngine
The planner receives the available tool schemas + history dynamically — its
system prompt does not need to hardcode worker names.
"""
from __future__ import annotations
from lazybridge import Agent, LLMEngine, ReplanEngine
from lazybridge.engines.replan import PlanRound
# ---------------------------------------------------------------------------
# 1. Sub-agents — each owns its own tool set / system prompt
# ---------------------------------------------------------------------------
def web_search(query: str) -> str:
"""Look up current facts on the web (stub)."""
return f"[stub web result for {query!r}]"
def add(a: float, b: float) -> float:
"""Add two numbers."""
return a + b
def multiply(a: float, b: float) -> float:
"""Multiply two numbers."""
return a * b
research_agent = Agent(
engine=LLMEngine(
"deepseek-v4-flash",
system="You look up current facts via web_search. You do not do math.",
),
tools=[web_search],
name="research",
description="Web lookups, facts, news. Cannot do math.",
)
math_agent = Agent(
engine=LLMEngine(
"deepseek-v4-flash",
system="You solve arithmetic with the add/multiply tools. One tool at a time.",
),
tools=[add, multiply],
name="math",
description="Arithmetic only.",
)
writer_agent = Agent(
engine=LLMEngine(
"deepseek-v4-flash",
system="You synthesise prior results into clear prose. No new facts.",
),
name="writer",
description="Synthesise prior results into prose. Adds no new facts.",
)
# ---------------------------------------------------------------------------
# 2. Planner — emits PlanRound each turn
#
# The system prompt is minimal: ReplanEngine injects the available tool
# schemas and the accumulated history into every planner call dynamically.
# ---------------------------------------------------------------------------
PLANNER_SYSTEM = """\
You are a task planner. Each turn you receive:
- "Available tools:" — the tool names, signatures, and descriptions
- "Task:" — the original user query
- "History:" — outputs from prior rounds
Produce ONE PlanRound. Rules:
1. Tasks within a round run IN PARALLEL — put dependent tasks in the next round.
2. Use tool names and kwargs exactly as listed in "Available tools".
3. When the question is answered, set done=true and put the answer in final_answer.
4. Be greedy with parallelism: independent lookups belong in the same round.
"""
planner = Agent(
engine=LLMEngine("deepseek-v4-flash", system=PLANNER_SYSTEM),
output=PlanRound,
name="planner",
)
# ---------------------------------------------------------------------------
# 3. Guardian — ReplanEngine wraps the replan loop with checkpoint/resume
# ---------------------------------------------------------------------------
guardian = Agent(
engine=ReplanEngine(max_rounds=10), # add store= + checkpoint_key= for persistence
tools=[planner, research_agent, math_agent, writer_agent],
name="guardian",
)
# ---------------------------------------------------------------------------
# 4. Entry point
# ---------------------------------------------------------------------------
def main() -> None:
query = (
"What is the combined headcount of Apple and Google in 2024, and "
"write a one-paragraph note on what those numbers say about the "
"two companies' staffing strategies?"
)
env = guardian(query)
if env.error:
print(f"ERROR: {env.error.message}")
else:
print("\n=== FINAL ANSWER ===\n" + (env.text() or ""))
if __name__ == "__main__":
main()
Walkthrough¶
PlanRoundis a Pydantic schema — the planner emits a list of tasks for the next round plus adone: boolflag. The outer loop dispatches all tasks concurrently viaasyncio.gather, collects results, feeds them back to the planner.max_roundsis the safety net for bad termination logic — ifdone=Falsekeeps firing forever, the loop bails. Set it defensively.- Per-task sequential flag lets a round mix parallel and sequential tasks: the planner can declare that a specific task must wait for the others to finish before running.
Variations¶
- Add a
verify=judgeon the planner agent itself to gate termination — the judge sees the latest round's results and decides whetherdone=Trueis justified. - Persist round results to a
Storeso a debugger / dashboard can watch progress in real time. - Replace the manual
asyncio.gatherwithPlanparallel bands (Step(parallel=True)) if the round structure is fixed enough to declare up-front.
Variations — anti-patterns¶
- Pathological case: planner emits
done=Falseand an empty task list — the loop spins. The example file's source comment (line 196 in the upstream version) documents this; mitigate by adding a "no-tasks → final answer" branch.
See also¶
- ReplanEngine — the engine that wraps
this loop with structured rounds and checkpoint/resume, so you don't
hand-roll the
asyncio.gatherorchestration. - Plan — declared alternative when the structure is known up front.
- Parallel —
Agent.parallelis the application-layer fan-out used inside each round. - Agent builds a plan — typed-spec alternative when the topology is decided once rather than re-emitted every round.