Skip to content

Session & observability

Session is the event bus that fans observability events into exporters and exposes the GraphSchema topology view. Six core exporter classes ship under lazybridge.*; OTelExporter lives under lazybridge.ext.otel (see Extension engines).

For narrative usage see Guides → Mid → Session, Guides → Full → Exporters, and Guides → Full → GraphSchema.

Symbol Role
Session The event bus. Attached to an Agent via session= (or implicitly via verbose=True).
EventLog SQLite-backed event store under Session.events.
EventType StrEnum of emitted event kinds (AGENT_START, TOOL_CALL, AGENT_FINISH, …).
GraphSchema Topology view of registered agents + tool edges; renderable via Session.graph.to_json().
EventExporter Base protocol — implement export(event) to add your own sink.
ConsoleExporter Human-readable stdout exporter; used implicitly by verbose=True.
CallbackExporter Routes events to a user-supplied callable.
FilteredExporter Wrap any exporter to drop events that don't match a predicate.
JsonFileExporter Newline-delimited JSON sink.
StructuredLogExporter Logs events as structured records via the logging module.
OTelExporter (ext) OpenTelemetry GenAI conventions; see Guides → Advanced → OpenTelemetry.

Session

lazybridge.Session

Session(*, db: str | None = None, exporters: list[Any] | None = None, redact: Callable[[dict[str, Any]], dict[str, Any]] | None | Any = _REDACT_UNSET, redact_on_error: Literal['fallback', 'strict'] = 'strict', unsafe_log_payloads: bool = False, console: bool = False, batched: bool = False, batch_size: int = 100, batch_interval: float = 1.0, max_queue_size: int = 10000, on_full: Literal['drop', 'block', 'hybrid'] = 'hybrid', critical_events: frozenset[str] | set[str] | None = None)

Container for observability config: exporters, redaction, EventLog.

Construct a Session.

Back-pressure policy

on_full selects what happens when the batched-writer queue is saturated (batched=True):

  • "hybrid" (default) — block for audit-critical event types (AGENT_*, TOOL_*, HIL_DECISION; override via critical_events=) and silently drop the cheap high-volume ones (LOOP_STEP / MODEL_REQUEST / MODEL_RESPONSE). A buggy slow exporter no longer makes AGENT_FINISH or TOOL_ERROR disappear, while a steady-state telemetry firehose still doesn't add latency to the producer.
  • "block" — back-pressure unconditionally. Pick this when every event must persist (compliance) and producer latency can absorb the wait.
  • "drop" — never back-pressure. Saturation drops events silently (with a doubling-interval warning). Pick this when telemetry must not block production traffic and lossy traces are acceptable.
Redactor failure modes

redact_on_error governs what happens when a redact callable either raises or returns a non-dict:

  • "strict" (default) — warn once, then drop the event entirely. No record in the EventLog, no export to exporters. Fail-closed: unredacted data can never leak via this session. This is the safe default for compliance workloads where a broken redactor is a bug to be fixed, not a reason to keep leaking.
  • "fallback" — warn once, then record and export the original, unredacted payload. Observability is preserved at the cost of potentially leaking unredacted data through the event bus. Useful for development; opt in explicitly when you want it.

Default is "strict": a redactor that fails closes the event rather than persisting it unredacted. Pass redact_on_error="fallback" to keep the unredacted event flowing when redaction fails (lower fidelity, but lossless).

Default-safe secret redaction

When redact is not passed, Session defaults to :func:redact_secrets so well-known credential shapes (sk-..., ghp_..., AIza..., JWTs, Bearer ... etc.) are stripped from every event payload before it hits the EventLog or any exporter. Pass redact=None to opt out for a single Session, or unsafe_log_payloads=True for the same effect with a more searchable construction-site keyword. Passing your own redact=callable always wins — Session will not stack the default in front of a user redactor.

Source code in lazybridge/session.py
def __init__(
    self,
    *,
    db: str | None = None,
    exporters: list[Any] | None = None,
    redact: Callable[[dict[str, Any]], dict[str, Any]] | None | Any = _REDACT_UNSET,
    redact_on_error: Literal["fallback", "strict"] = "strict",
    unsafe_log_payloads: bool = False,
    console: bool = False,
    batched: bool = False,
    batch_size: int = 100,
    batch_interval: float = 1.0,
    max_queue_size: int = 10_000,
    on_full: Literal["drop", "block", "hybrid"] = "hybrid",
    critical_events: frozenset[str] | set[str] | None = None,
) -> None:
    """Construct a Session.

    Back-pressure policy
    --------------------
    ``on_full`` selects what happens when the batched-writer queue
    is saturated (``batched=True``):

    * ``"hybrid"`` (default) — block for audit-critical event types
      (``AGENT_*``, ``TOOL_*``, ``HIL_DECISION``; override via
      ``critical_events=``) and silently drop the cheap high-volume
      ones (``LOOP_STEP`` / ``MODEL_REQUEST`` / ``MODEL_RESPONSE``).
      A buggy slow exporter no longer makes ``AGENT_FINISH`` or
      ``TOOL_ERROR`` disappear, while a steady-state telemetry
      firehose still doesn't add latency to the producer.
    * ``"block"`` — back-pressure unconditionally.  Pick this when
      every event must persist (compliance) and producer latency
      can absorb the wait.
    * ``"drop"``  — never back-pressure.  Saturation drops events
      silently (with a doubling-interval warning).  Pick this when
      telemetry must not block production traffic and lossy traces
      are acceptable.

    Redactor failure modes
    ----------------------
    ``redact_on_error`` governs what happens when a ``redact``
    callable either raises or returns a non-dict:

    * ``"strict"`` (default) — warn once, then **drop the event
      entirely**.  No record in the EventLog, no export to
      exporters.  Fail-closed: unredacted data can never leak via
      this session.  This is the safe default for compliance
      workloads where a broken redactor is a bug to be fixed, not
      a reason to keep leaking.
    * ``"fallback"`` — warn once, then record and export the
      **original, unredacted** payload.  Observability is preserved
      at the cost of potentially leaking unredacted data through
      the event bus.  Useful for development; opt in explicitly
      when you want it.

    Default is ``"strict"``: a redactor that fails closes the
    event rather than persisting it unredacted.  Pass
    ``redact_on_error="fallback"`` to keep the unredacted event
    flowing when redaction fails (lower fidelity, but lossless).

    Default-safe secret redaction
    -----------------------------
    When ``redact`` is not passed, Session defaults to
    :func:`redact_secrets` so well-known credential shapes
    (``sk-...``, ``ghp_...``, ``AIza...``, JWTs, ``Bearer ...``
    etc.) are stripped from every event payload before it hits the
    EventLog or any exporter.  Pass ``redact=None`` to opt out for
    a single Session, or ``unsafe_log_payloads=True`` for the same
    effect with a more searchable construction-site keyword.
    Passing your own ``redact=callable`` always wins — Session
    will not stack the default in front of a user redactor.
    """
    if redact_on_error not in ("fallback", "strict"):
        raise ValueError(
            f"Session(redact_on_error={redact_on_error!r}): must be "
            f"'fallback' (warn + pass through) or 'strict' (warn + "
            f"drop event)."
        )
    # Resolve the redactor.  Three valid input states:
    #   * not passed              → default to redact_secrets (safe)
    #   * redact=None             → explicit opt-out, no redaction
    #   * redact=callable         → user supplies their own
    # ``unsafe_log_payloads=True`` is the searchable alias for
    # explicit opt-out and only applies when no redactor was given.
    if redact is _REDACT_UNSET:
        self._redact = None if unsafe_log_payloads else redact_secrets
    else:
        self._redact = redact  # type: ignore[assignment]
    self.session_id = str(uuid.uuid4())
    self.events = EventLog(
        self.session_id,
        db=db,
        batched=batched,
        batch_size=batch_size,
        batch_interval=batch_interval,
        max_queue_size=max_queue_size,
        on_full=on_full,
        critical_events=critical_events,
    )
    self._exporters: list[Any] = list(exporters or [])
    # ``self._redact`` was resolved above based on the
    # _REDACT_UNSET sentinel + ``unsafe_log_payloads``.
    self._redact_on_error = redact_on_error
    self._lock = threading.Lock()
    # Phase-3 Block J: warn-once-per-(exporter-class, exception-class).
    # Pre-fix every emit() that hit a broken exporter spammed an
    # identical warning, drowning real diagnostics.  Counter maps the
    # ``(exporter_cls, exc_cls)`` pair to the number of suppressed
    # warnings since the first emission so operators can still see
    # the magnitude of the problem.
    self._exporter_warned_keys: set[tuple[str, str]] = set()
    self._exporter_warn_counts: dict[tuple[str, str], int] = {}
    self.graph = GraphSchema(session_id=self.session_id)
    if console:
        # Late import to avoid circular dependency with exporters
        from lazybridge.exporters import ConsoleExporter

        self._exporters.append(ConsoleExporter())

register_agent

register_agent(agent: Any) -> None

Register an agent with this session's graph.

Source code in lazybridge/session.py
def register_agent(self, agent: Any) -> None:
    """Register an agent with this session's graph."""
    self.graph.add_agent(agent)

register_tool_edge

register_tool_edge(from_agent: Any, to_agent: Any, *, label: str = '') -> None

Record a tool-call edge between two registered agents.

Source code in lazybridge/session.py
def register_tool_edge(self, from_agent: Any, to_agent: Any, *, label: str = "") -> None:
    """Record a tool-call edge between two registered agents."""
    from_id = str(getattr(from_agent, "name", "agent"))
    to_id = str(getattr(to_agent, "name", "agent"))
    from lazybridge.graph import EdgeType

    self.graph.add_edge(from_id, to_id, label=label, kind=EdgeType.TOOL)

flush

flush(timeout: float = 5.0) -> None

Drain the EventLog's batched-writer queue.

No-op when batched=False. Useful before a checkpoint or a clean shutdown so recently-emitted events are persisted before the caller proceeds.

Source code in lazybridge/session.py
def flush(self, timeout: float = 5.0) -> None:
    """Drain the EventLog's batched-writer queue.

    No-op when ``batched=False``.  Useful before a checkpoint or
    a clean shutdown so recently-emitted events are persisted
    before the caller proceeds.
    """
    self.events.flush(timeout=timeout)

close

close() -> None

Release the underlying EventLog's SQLite connections.

Idempotent. Call this when a Session's lifetime ends (e.g. end of an HTTP request) so file descriptors don't linger until the owning thread exits. Using Session as a context manager is equivalent. Exporters that expose close() are flushed too — useful for OTelExporter's orphaned-span cleanup.

Source code in lazybridge/session.py
def close(self) -> None:
    """Release the underlying EventLog's SQLite connections.

    Idempotent.  Call this when a Session's lifetime ends (e.g.
    end of an HTTP request) so file descriptors don't linger until
    the owning thread exits.  Using Session as a context manager is
    equivalent.  Exporters that expose ``close()`` are flushed too
    — useful for OTelExporter's orphaned-span cleanup.
    """
    self.events.close()
    for exp in list(self._exporters):
        close = getattr(exp, "close", None)
        if callable(close):
            try:
                close()
            except Exception:
                # Exporter shutdown must never mask the real reason
                # Session.close() is being called.
                pass

usage_summary

usage_summary() -> dict[str, Any]

Aggregate token usage and cost across all agent runs in this session.

Returns a dict with
  • "total": {input_tokens, output_tokens, cost_usd}
  • "by_agent": {agent_name: {input_tokens, output_tokens, cost_usd}}
  • "by_run": {run_id: {agent_name, input_tokens, output_tokens, cost_usd}}

O(events) with TWO queries total (AGENT_START + MODEL_RESPONSE). EventLog.query exposes run_id directly in the result dict.

Source code in lazybridge/session.py
def usage_summary(self) -> dict[str, Any]:
    """Aggregate token usage and cost across all agent runs in this session.

    Returns a dict with:
      - "total": {input_tokens, output_tokens, cost_usd}
      - "by_agent": {agent_name: {input_tokens, output_tokens, cost_usd}}
      - "by_run":   {run_id:    {agent_name, input_tokens, output_tokens, cost_usd}}

    O(events) with TWO queries total (AGENT_START +
    MODEL_RESPONSE).  ``EventLog.query`` exposes ``run_id``
    directly in the result dict.
    """
    # Two bulk queries.  No per-row DB trip.
    agent_starts = self.events.query(event_type=EventType.AGENT_START)
    model_responses = self.events.query(event_type=EventType.MODEL_RESPONSE)

    # Build run_id → agent_name map.
    run_agent: dict[str, str] = {
        row["run_id"]: row["payload"].get("agent_name", "unknown") for row in agent_starts if row.get("run_id")
    }

    total = {"input_tokens": 0, "output_tokens": 0, "cost_usd": 0.0}
    by_agent: dict[str, dict[str, Any]] = {}
    by_run: dict[str, dict[str, Any]] = {}

    for row in model_responses:
        p = row["payload"]
        run_id = row.get("run_id")
        # Prefer the agent_name carried in the MODEL_RESPONSE payload
        # (LLMEngine populates it as the innermost agent name).  Falling
        # back to the AGENT_START → run_id map only matters for legacy
        # / external emitters that don't include the field.
        payload_agent = p.get("agent_name")
        if payload_agent:
            agent_name = str(payload_agent)
        elif run_id:
            agent_name = run_agent.get(run_id, "unknown")
        else:
            agent_name = "unknown"

        in_tok = p.get("input_tokens", 0) or 0
        out_tok = p.get("output_tokens", 0) or 0
        cost = p.get("cost_usd") or 0.0

        total["input_tokens"] += in_tok
        total["output_tokens"] += out_tok
        total["cost_usd"] += cost

        ag = by_agent.setdefault(agent_name, {"input_tokens": 0, "output_tokens": 0, "cost_usd": 0.0})
        ag["input_tokens"] += in_tok
        ag["output_tokens"] += out_tok
        ag["cost_usd"] += cost

        if run_id:
            rn = by_run.setdefault(
                run_id, {"agent_name": agent_name, "input_tokens": 0, "output_tokens": 0, "cost_usd": 0.0}
            )
            rn["input_tokens"] += in_tok
            rn["output_tokens"] += out_tok
            rn["cost_usd"] += cost

    return {"total": total, "by_agent": by_agent, "by_run": by_run}

Event log + types

lazybridge.EventLog

EventLog(session_id: str, db: str | None = None, *, batched: bool = False, batch_size: int = 100, batch_interval: float = 1.0, max_queue_size: int = 10000, on_full: Literal['drop', 'block', 'hybrid'] = 'hybrid', critical_events: frozenset[str] | set[str] | None = None)

SQLite-backed event log. Thread-safe via thread-local connections.

By default record() performs an INSERT + COMMIT per event on the calling thread. That is fine for low event rates but becomes a bottleneck under sustained load. Pass batched=True to delegate persistence to a background daemon thread that drains a bounded queue and commits in batches; the hot path becomes a non-blocking queue.put_nowait.

Source code in lazybridge/session.py
def __init__(
    self,
    session_id: str,
    db: str | None = None,
    *,
    batched: bool = False,
    batch_size: int = 100,
    batch_interval: float = 1.0,
    max_queue_size: int = 10_000,
    on_full: Literal["drop", "block", "hybrid"] = "hybrid",
    critical_events: frozenset[str] | set[str] | None = None,
) -> None:
    self.session_id = session_id
    self._db = db
    if batched:
        if batch_size < 1:
            raise ValueError(f"batch_size must be >= 1, got {batch_size!r}")
        if batch_interval <= 0:
            raise ValueError(f"batch_interval must be > 0, got {batch_interval!r}")
        if max_queue_size < 1:
            raise ValueError(f"max_queue_size must be >= 1, got {max_queue_size!r}")
        if on_full not in ("drop", "block", "hybrid"):
            raise ValueError(f"on_full must be 'drop', 'block', or 'hybrid', got {on_full!r}")
    # In-memory SQLite needs a SHARED cache otherwise every thread
    # gets its own isolated DB and events emitted from worker
    # threads (e.g. ``SupervisorEngine`` via ``asyncio.to_thread``)
    # land in a DB the main thread can never see.  Using the
    # ``file::memory:?cache=shared`` URI uniquely named per
    # ``session_id`` gives us one shared in-memory DB per Session.
    self._uri: str | None
    if db is None:
        self._uri = f"file:memdb_{session_id}?mode=memory&cache=shared"
    else:
        self._uri = None
    self._local = threading.local()
    self._lock = threading.Lock()
    # Registry of every thread-local connection we've handed out.
    # ``close()`` walks it to release file descriptors deterministically;
    # without this, worker-thread connections leak until GC runs.
    self._all_conns: list[sqlite3.Connection] = []
    self._closed = False
    # Keep one anchor connection alive for in-memory DBs — SQLite
    # drops a ``file::memory:?cache=shared`` DB as soon as the last
    # connection closes, so without the anchor the first cleanup
    # of a thread-local conn would wipe the table.
    if self._uri is not None:
        self._anchor: sqlite3.Connection | None = sqlite3.connect(
            self._uri,
            uri=True,
            check_same_thread=False,
        )
    else:
        self._anchor = None
    self._init_schema()

    # Batched writer state — set up after schema so the background
    # thread can rely on the events table existing on first flush.
    self._batched = batched
    self._batch_size = batch_size
    self._batch_interval = batch_interval
    self._max_queue_size = max_queue_size
    self._on_full = on_full
    # Per-event-type back-pressure — the ``"hybrid"`` policy uses
    # this set to decide which events block under saturation.  A
    # caller-supplied set wins over the defaults (frozen for safety
    # against post-construction mutation).
    self._critical_events: frozenset[str] = (
        frozenset(critical_events) if critical_events is not None else DEFAULT_CRITICAL_EVENT_TYPES
    )
    self._dropped_count = 0
    self._dropped_critical_count = 0
    self._writer_thread: threading.Thread | None = None
    # The queue carries event tuples plus a singleton ``_FLUSH_SENTINEL``
    # (``object()``) used to wake the writer thread on shutdown.
    self._writer_queue: queue.Queue[tuple | object] | None = None
    self._writer_stop = threading.Event()
    if batched:
        self._writer_queue = queue.Queue(maxsize=max_queue_size)
        self._writer_thread = threading.Thread(
            target=self._writer_run,
            name=f"lazybridge-eventlog-{session_id[:8]}",
            daemon=True,
        )
        self._writer_thread.start()

close

close() -> None

Close every thread-local connection and the anchor (if any).

Idempotent. After close() further record / query calls raise RuntimeError. Required for deterministic FD cleanup in long-running services that spawn Sessions per request.

The lock is held across the entire shutdown so a concurrent record that already obtained a connection via _conn can't race ahead and commit against a connection we're about to close — SQLite would otherwise raise ProgrammingError: Cannot operate on a closed database.

When batching is enabled the background writer is signalled to drain its queue and exit before connections are released.

Source code in lazybridge/session.py
def close(self) -> None:
    """Close every thread-local connection and the anchor (if any).

    Idempotent.  After ``close()`` further ``record`` / ``query``
    calls raise ``RuntimeError``.  Required for deterministic FD
    cleanup in long-running services that spawn Sessions per
    request.

    The lock is held across the entire shutdown so a concurrent
    ``record`` that already obtained a connection via ``_conn``
    can't race ahead and commit against a connection we're about
    to close — SQLite would otherwise raise ``ProgrammingError:
    Cannot operate on a closed database``.

    When batching is enabled the background writer is signalled to
    drain its queue and exit before connections are released.
    """
    # Idempotent: a second close() (typically from ``__del__`` at
    # GC time) must not re-trigger flush() — that would push a
    # sentinel into a queue whose writer thread has already
    # exited, causing flush() to block for the full timeout
    # waiting for an ack that never comes.
    if self._closed:
        return
    # Drain + stop the writer first.  Done outside the lock so the
    # writer thread can finish using the EventLog's connections;
    # ``record_many`` checks ``self._closed`` itself.  Order:
    # (1) flush so any pending events land, (2) set stop and push a
    # sentinel so the writer wakes from its long ``queue.get``
    # timeout immediately, (3) join.
    if self._batched and self._writer_thread is not None:
        self.flush(timeout=5.0)
        self._writer_stop.set()
        assert self._writer_queue is not None
        try:
            self._writer_queue.put_nowait(_FLUSH_SENTINEL)
        except queue.Full:
            # Queue is saturated — the writer is busy and will see
            # the stop flag on its next iteration anyway.
            pass
        self._writer_thread.join(timeout=5.0)
    with self._lock:
        if self._closed:
            return
        self._closed = True
        conns = list(self._all_conns)
        self._all_conns.clear()
        anchor = self._anchor
        self._anchor = None
        for c in conns:
            try:
                c.close()
            except sqlite3.Error:
                pass
        if anchor is not None:
            try:
                anchor.close()
            except sqlite3.Error:
                pass

record_many

record_many(rows: list[tuple]) -> None

Insert a batch of pre-serialised rows in a single transaction.

Each row is the 5-tuple (session_id, run_id, event_type, payload_json, ts) — the on-disk shape, not a dict. Used by the background batched writer; callers should use :meth:record instead.

Source code in lazybridge/session.py
def record_many(self, rows: list[tuple]) -> None:
    """Insert a batch of pre-serialised rows in a single transaction.

    Each ``row`` is the 5-tuple
    ``(session_id, run_id, event_type, payload_json, ts)`` —
    the on-disk shape, not a dict.  Used by the background batched
    writer; callers should use :meth:`record` instead.
    """
    if not rows:
        return
    # Fast-path check: if ``close()`` has fired we fail fast instead
    # of executing against a connection that's about to disappear.
    # Same race semantics as :meth:`record` — bounded to a single
    # ``executemany + COMMIT``; SQLite will either succeed or raise
    # ``ProgrammingError``, which the background writer logs and
    # drops the row batch.
    if self._closed:
        raise RuntimeError("EventLog is closed")
    conn = self._conn()
    conn.executemany(
        "INSERT INTO events (session_id, run_id, event_type, payload, ts) VALUES (?,?,?,?,?)",
        rows,
    )
    conn.commit()

flush

flush(timeout: float = 5.0) -> None

Block until every event submitted before the call is persisted.

No-op when batched=False. Pushes a flush sentinel so the writer commits its current batch immediately rather than waiting for batch_size or batch_interval, then waits on the queue's task_done accounting. Returns early if timeout elapses; the queue may still have items in that case.

Source code in lazybridge/session.py
def flush(self, timeout: float = 5.0) -> None:
    """Block until every event submitted before the call is persisted.

    No-op when ``batched=False``.  Pushes a flush sentinel so the
    writer commits its current batch immediately rather than waiting
    for ``batch_size`` or ``batch_interval``, then waits on the
    queue's ``task_done`` accounting.  Returns early if ``timeout``
    elapses; the queue may still have items in that case.
    """
    if not self._batched or self._writer_queue is None:
        return
    try:
        self._writer_queue.put_nowait(_FLUSH_SENTINEL)
    except queue.Full:
        # If the queue is saturated, fall back to a blocking put so
        # flush still has well-defined semantics — accepting the
        # backpressure cost is the right trade-off here since the
        # caller asked for a synchronous barrier.
        self._writer_queue.put(_FLUSH_SENTINEL)
    # ``Queue.join`` has no timeout in stdlib; use ``unfinished_tasks``
    # plus ``all_tasks_done`` polling under the queue's own lock.
    deadline = time.monotonic() + timeout
    with self._writer_queue.all_tasks_done:
        while self._writer_queue.unfinished_tasks > 0:
            remaining = deadline - time.monotonic()
            if remaining <= 0:
                return
            self._writer_queue.all_tasks_done.wait(timeout=remaining)

lazybridge.EventType

Bases: StrEnum

Graph topology

lazybridge.GraphSchema

GraphSchema(session_id: str = '')

Directed graph of agents, routers, and their connections.

Auto-populated by :class:lazybridge.Session as Agents register themselves via session=. Can also be built manually for GUI-driven pipeline construction.

Source code in lazybridge/graph/schema.py
def __init__(self, session_id: str = "") -> None:
    self.session_id = session_id
    self._nodes: dict[str, _BaseNode] = {}
    self._edges: list[Edge] = []

add_agent

add_agent(agent: Any) -> None

Register an Agent (or duck-typed equivalent) as a graph node.

Reads id / name off the agent, and infers provider + model from either legacy _provider_name / _model_name attributes or from agent.engine on the v1 :class:~lazybridge.Agent.

Also registers any Python-callable tools the agent exposes as ToolNode stubs so the graph is fully visible before any events are emitted (static inspection / demo mode).

Source code in lazybridge/graph/schema.py
def add_agent(self, agent: Any) -> None:
    """Register an Agent (or duck-typed equivalent) as a graph node.

    Reads ``id`` / ``name`` off the agent, and infers provider + model
    from either legacy ``_provider_name`` / ``_model_name`` attributes
    or from ``agent.engine`` on the v1 :class:`~lazybridge.Agent`.

    Also registers any Python-callable tools the agent exposes as
    ``ToolNode`` stubs so the graph is fully visible before any
    events are emitted (static inspection / demo mode).
    """
    provider, model = _derive_provider_model(agent)
    node_id = str(getattr(agent, "id", None) or getattr(agent, "name", "agent"))

    # Canonical engine kind — lets the viz distinguish LLM nodes from Plan
    # orchestrators or custom engines without needing to parse the model string.
    engine = getattr(agent, "engine", None)
    engine_type = engine.__class__.__name__ if engine is not None else ""

    # Collect names of Python-callable tools (not agent-as-tool entries) for
    # the node badge so the inspector can list them without reading edges.
    tool_map = getattr(agent, "_tool_map", None) or {}
    callable_tool_names: list[str] = []
    for tool_name, tool_obj in tool_map.items():
        # Agent-as-tool wrappers have returns_envelope=True (set by _agent_as_tool).
        # Plain Tool.from_fn() wrappers have returns_envelope=False.
        # This is the only stable discriminator: agent_memory/agent_store can both
        # be None for an agent-as-tool when the source agent has no memory or store.
        if not getattr(tool_obj, "returns_envelope", False):
            callable_tool_names.append(tool_name)

    node = AgentNode(
        id=node_id,
        name=getattr(agent, "name", node_id),
        provider=provider,
        model=model,
        system=getattr(agent, "system", None),
        engine_type=engine_type,
        tools=callable_tool_names,
    )
    self._nodes[node.id] = node

    # Register Python-callable tool functions as ToolNode stubs so the
    # full pipeline topology is visible before execution starts.
    # Agent-as-tool wrappers (returns_envelope=True) register themselves
    # as AgentNodes separately and must not get a duplicate _ToolNode stub.
    for tool_name, tool_obj in tool_map.items():
        if getattr(tool_obj, "returns_envelope", False):
            continue
        tool_id = f"tool:{tool_name}"
        if tool_id not in self._nodes:
            self._nodes[tool_id] = _ToolNode(id=tool_id, name=tool_name)
        self.add_edge(node_id, tool_id, label=tool_name, kind=EdgeType.TOOL)

add_router

add_router(router: Any) -> None

Register a router (e.g. a Plan) as a graph node.

The router object is expected to expose to_graph_node() that returns {id, name, routes, default}.

Source code in lazybridge/graph/schema.py
def add_router(self, router: Any) -> None:
    """Register a router (e.g. a Plan) as a graph node.

    The router object is expected to expose ``to_graph_node()`` that
    returns ``{id, name, routes, default}``.
    """
    node_dict = router.to_graph_node()
    node = RouterNode(
        id=node_dict.get("id", node_dict.get("name", "")),
        name=node_dict.get("name", ""),
        routes=node_dict.get("routes", {}),
        default=node_dict.get("default"),
    )
    self._nodes[node.id] = node

clear

clear() -> None

Drop all nodes and edges.

Useful when re-using one Session across multiple pipeline runs and you want each run's graph to start empty. Session's session_id is preserved so event correlation still works.

Source code in lazybridge/graph/schema.py
def clear(self) -> None:
    """Drop all nodes and edges.

    Useful when re-using one ``Session`` across multiple pipeline
    runs and you want each run's graph to start empty.  Session's
    ``session_id`` is preserved so event correlation still works.
    """
    self._nodes.clear()
    self._edges.clear()

save

save(path: str) -> None

Save schema to JSON or YAML depending on file extension.

Source code in lazybridge/graph/schema.py
def save(self, path: str) -> None:
    """Save schema to JSON or YAML depending on file extension."""
    with open(path, "w", encoding="utf-8") as f:
        f.write(self.to_yaml() if _is_yaml_path(path) else self.to_json())

Exporters

lazybridge.EventExporter

Bases: Protocol

Protocol satisfied by all exporter classes.

lazybridge.CallbackExporter

CallbackExporter(*, fn: Callable[[dict[str, Any]], None])

Forward every event to a user-supplied callable.

Source code in lazybridge/exporters.py
def __init__(self, *, fn: Callable[[dict[str, Any]], None]) -> None:
    self._fn = fn

lazybridge.ConsoleExporter

ConsoleExporter(*, stream: Any = None)

Pretty-print events to stdout for human inspection.

Output format (one line per event)::

[agent_name] event_type  key=value  key=value

Installed automatically by Session(console=True) and Agent(verbose=True); can also be added manually via Session(exporters=[ConsoleExporter()]).

Source code in lazybridge/exporters.py
def __init__(self, *, stream: Any = None) -> None:
    import sys

    self._stream = stream or sys.stdout

lazybridge.FilteredExporter

FilteredExporter(*, inner: Any, event_types: set[str])

Forward only events whose type is in event_types to inner.

Source code in lazybridge/exporters.py
def __init__(self, *, inner: Any, event_types: set[str]) -> None:
    self._inner = inner
    self._types = event_types

lazybridge.JsonFileExporter

JsonFileExporter(*, path: str)

Append each event as a JSON line to path.

F7: keeps the file handle open across calls instead of opening and closing it on every event. Under a typical agent run with 50-200 events the original per-call open/fwrite/close caused O(n) filesystem syscalls. close() is called automatically by Session.close() when it iterates its exporter list.

Source code in lazybridge/exporters.py
def __init__(self, *, path: str) -> None:
    self._path = path
    self._fh = open(path, "a", encoding="utf-8")  # noqa: SIM115
    self._lock = threading.Lock()

close

close() -> None

Flush and close the underlying file handle. Idempotent.

Source code in lazybridge/exporters.py
def close(self) -> None:
    """Flush and close the underlying file handle. Idempotent."""
    with self._lock:
        try:
            if not self._fh.closed:
                self._fh.flush()
                self._fh.close()
        except Exception:
            pass

lazybridge.StructuredLogExporter

StructuredLogExporter(*, logger_name: str = 'lazybridge')

Emit each event via Python's logging module.

Source code in lazybridge/exporters.py
def __init__(self, *, logger_name: str = "lazybridge") -> None:
    self._log = logging.getLogger(logger_name)