Skip to content

State primitives

Memory is the per-agent conversation history layer. Store is the shared, optionally-persistent key-value blackboard.

For narrative usage see Guides → Mid → Memory and Guides → Mid → Store. For wiring data between Plan steps see Sentinels & predicates.

lazybridge.Memory

Memory(*, strategy: Literal['auto', 'sliding', 'summary', 'none'] = 'auto', max_tokens: int | None = 4000, max_turns: int | None = _DEFAULT_MAX_TURNS, store: Any | None = None, summarizer: Any | None = None, summarizer_timeout: float | None = _DEFAULT_SUMMARIZER_TIMEOUT)

Conversation memory with configurable compression strategy.

Per-agent use: memory=Memory() on the Agent — tracks its message history. Shared use: sources=[memory] on multiple Agents — live view of shared text.

The text() method returns the current memory as a context string, re-read on every invocation (live view — never a stale snapshot).

LLM summarization

Pass any callable (typically an Agent) as summarizer= to enable LLM-based compression instead of the keyword-extraction fallback::

summarizer = Agent("claude-haiku-4-5-20251001", system="Summarize conversations concisely.")
memory = Memory(strategy="summary", summarizer=summarizer)
agent  = Agent("claude-opus-4-7", memory=memory)

When compression triggers the summarizer is called with a formatted transcript of the turns being dropped. Three callable shapes are handled transparently:

  • Sync callable returning a string / Envelope / anything with .text() — called directly.
  • Agent — its __call__ already bridges async internally.
  • Plain async def summarize(prompt): ... — the returned coroutine is driven to completion (in a worker thread when called from inside an event loop, to avoid nested-loop errors).

If the summarizer raises, compression falls back to keyword extraction — never silent garbage.

Create a Memory instance.

Parameters

strategy: "auto" — compress when token budget is exceeded (requires max_tokens to be set). "sliding" — compress by turn count regardless of max_tokens; does NOT require a token budget. "summary" — same trigger as "sliding" but uses the LLM summarizer instead of keyword extraction. "none" — no compression; only the hard max_turns cap applies. max_tokens: Token budget for strategy="auto". Ignored by "sliding" and "summary" (they compress by turn count). None with strategy="auto" disables compression entirely. max_turns: Hard cap on retained turns after compression runs. None disables the cap (unbounded history — only safe for short sessions). Default: :attr:_DEFAULT_MAX_TURNS (1000). store: Optional :class:~lazybridge.store.Store for persistent memory across sessions. summarizer: Callable used by strategy="summary" — typically an :class:~lazybridge.agent.Agent. See the class docstring for accepted shapes. summarizer_timeout: Deadline (seconds) applied to async summariser calls. On timeout the keyword-extraction fallback runs and a one-shot warning is emitted via :attr:_summarizer_warned. None disables the deadline. Default: 30 s.

Source code in lazybridge/memory.py
def __init__(
    self,
    *,
    strategy: Literal["auto", "sliding", "summary", "none"] = "auto",
    max_tokens: int | None = 4000,
    max_turns: int | None = _DEFAULT_MAX_TURNS,
    store: Any | None = None,
    summarizer: Any | None = None,
    summarizer_timeout: float | None = _DEFAULT_SUMMARIZER_TIMEOUT,
) -> None:
    """Create a Memory instance.

    Parameters
    ----------
    strategy:
        ``"auto"``    — compress when token budget is exceeded
        (requires ``max_tokens`` to be set).
        ``"sliding"`` — compress by turn count regardless of
        ``max_tokens``; does NOT require a token budget.
        ``"summary"`` — same trigger as ``"sliding"`` but uses the
        LLM ``summarizer`` instead of keyword extraction.
        ``"none"``    — no compression; only the hard ``max_turns``
        cap applies.
    max_tokens:
        Token budget for ``strategy="auto"``.  Ignored by
        ``"sliding"`` and ``"summary"`` (they compress by turn
        count).  ``None`` with ``strategy="auto"`` disables
        compression entirely.
    max_turns:
        Hard cap on retained turns after compression runs.  ``None``
        disables the cap (unbounded history — only safe for short
        sessions).  Default: :attr:`_DEFAULT_MAX_TURNS` (1000).
    store:
        Optional :class:`~lazybridge.store.Store` for persistent
        memory across sessions.
    summarizer:
        Callable used by ``strategy="summary"`` — typically an
        :class:`~lazybridge.agent.Agent`.  See the class docstring
        for accepted shapes.
    summarizer_timeout:
        Deadline (seconds) applied to async summariser calls.  On
        timeout the keyword-extraction fallback runs and a one-shot
        warning is emitted via :attr:`_summarizer_warned`.  ``None``
        disables the deadline.  Default: 30 s.
    """
    self.strategy = strategy
    self.max_tokens = max_tokens
    self.max_turns = max_turns
    self.store = store
    self._turns: list[_Turn] = []
    self._lock = threading.Lock()
    self._summary: str = ""
    self._overflow_warned = False
    self._summarizer = summarizer
    if summarizer_timeout is not None and summarizer_timeout <= 0:
        raise ValueError(f"summarizer_timeout must be > 0 or None, got {summarizer_timeout!r}")
    # Phase-2 Block C: warn on too-tight timeouts.  The summariser
    # spawns a worker thread + JSON serialisation + LLM round-trip;
    # values under 5 s typically time out on every call (silent
    # always-fail trap).
    if summarizer_timeout is not None and summarizer_timeout < 5.0:
        import warnings as _warnings

        _warnings.warn(
            f"Memory(summarizer_timeout={summarizer_timeout!r}): values under 5.0 s often "
            f"trigger a timeout on every summariser invocation (worker-thread setup + LLM "
            f"round-trip).  Consider 30.0 s (the production default) unless you have data "
            f"showing your summariser is reliably faster.",
            UserWarning,
            stacklevel=2,
        )
    self.summarizer_timeout = summarizer_timeout
    # Guard against overlapping summariser calls when ``add()`` is
    # invoked concurrently.  Only one compression runs at a time;
    # other ``add()``s append turns and skip the compression branch.
    self._compressing = False
    # Separate one-shot warning flags for distinct warning sources so
    # a summariser timeout doesn't silence the turn-cap warning and
    # vice versa.
    self._summarizer_warned = False

messages

messages() -> list[Message]

Return full message list including summary prefix if compressed.

Source code in lazybridge/memory.py
def messages(self) -> list[Message]:
    """Return full message list including summary prefix if compressed."""
    with self._lock:
        result: list[Message] = []
        if self._summary:
            result.append(Message(role=Role.USER, content=f"Context from earlier: {self._summary}"))
            result.append(Message(role=Role.ASSISTANT, content="Understood."))
        for t in self._turns:
            result.append(Message(role=Role.USER, content=t.user))
            result.append(Message(role=Role.ASSISTANT, content=t.assistant))
        return result

text

text() -> str

Return current memory as a plain-text string (live view).

Source code in lazybridge/memory.py
def text(self) -> str:
    """Return current memory as a plain-text string (live view)."""
    with self._lock:
        parts: list[str] = []
        if self._summary:
            parts.append(self._summary)
        for t in self._turns[-5:]:
            parts.append(f"User: {t.user}\nAssistant: {t.assistant}")
        return "\n\n".join(parts)

lazybridge.Store

Store(db: str | None = None)

Key-value store for PlanState and shared data.

db=None → in-memory (lost on exit). db="file" → SQLite with WAL mode (persistent).

Source code in lazybridge/store/__init__.py
def __init__(self, db: str | None = None) -> None:
    self._db = db
    self._local = threading.local()
    # ``RLock`` (not ``Lock``) so internal helpers like
    # ``_discard_thread_conn`` can safely re-enter the lock when
    # called from a code path (``compare_and_swap``) that already
    # holds it.
    self._lock = threading.RLock()
    # Track every opened thread-local connection so we can close
    # them deterministically from ``close()``.  ``threading.local``
    # only scopes attributes per thread; without a registry the
    # connections linger until the owning thread exits + GC runs,
    # which leaks file descriptors under worker pools.
    self._all_conns: list[sqlite3.Connection] = []
    self._closed = False
    if not db:
        self._mem: dict[str, StoreEntry] = {}
    self._init_schema()

close

close() -> None

Close every thread-local SQLite connection opened by this Store.

Idempotent. After close() the Store raises RuntimeError on further reads / writes so callers fail fast instead of silently re-opening connections.

Source code in lazybridge/store/__init__.py
def close(self) -> None:
    """Close every thread-local SQLite connection opened by this Store.

    Idempotent.  After ``close()`` the Store raises ``RuntimeError``
    on further reads / writes so callers fail fast instead of
    silently re-opening connections.
    """
    with self._lock:
        if self._closed:
            return
        self._closed = True
        conns = list(self._all_conns)
        self._all_conns.clear()
    for c in conns:
        try:
            c.close()
        except sqlite3.Error:
            pass  # already closed / invalid — nothing to recover

write

write(key: str, value: Any, *, agent_id: str | None = None) -> None

Store value under key.

Copy semantics (in-memory path): the value is deep-copied before storage via :func:_deep_copy_safe so that callers mutating the original object after write() cannot silently alter the stored value and defeat :meth:compare_and_swap. The SQLite path gets equivalent isolation for free via the JSON round-trip.

Pydantic instances are serialised via model_dump(mode="json") before JSON encoding — otherwise SQLite storage would fall through default=str and persist the model's __repr__ (e.g. "x=42 name='hello'"), which is NOT round-trippable. In-memory storage keeps the instance as-is since Python dicts don't need JSON round-tripping.

Source code in lazybridge/store/__init__.py
def write(self, key: str, value: Any, *, agent_id: str | None = None) -> None:
    """Store ``value`` under ``key``.

    **Copy semantics (in-memory path)**: the value is deep-copied
    before storage via :func:`_deep_copy_safe` so that callers
    mutating the original object after ``write()`` cannot silently
    alter the stored value and defeat :meth:`compare_and_swap`.
    The SQLite path gets equivalent isolation for free via the
    JSON round-trip.

    Pydantic instances are serialised via ``model_dump(mode="json")``
    before JSON encoding — otherwise SQLite storage would fall
    through ``default=str`` and persist the model's ``__repr__``
    (e.g. ``"x=42 name='hello'"``), which is NOT round-trippable.
    In-memory storage keeps the instance as-is since Python dicts
    don't need JSON round-tripping.
    """
    if self._db:
        serialised = _to_jsonable(value)
        self._conn().execute(
            "INSERT OR REPLACE INTO store (key, value, written_at, agent_id) VALUES (?,?,?,?)",
            (key, json.dumps(serialised, default=str), time.time(), agent_id),
        )
        self._conn().commit()
    else:
        with self._lock:
            self._mem[key] = StoreEntry(key=key, value=_deep_copy_safe(value), agent_id=agent_id)

read

read(key: str, default: Any = None) -> Any

Return the value stored under key, or default.

Copy semantics (in-memory path): returns a deep copy of the stored value via :func:_deep_copy_safe so that callers mutating the returned object cannot silently alter the stored value and defeat :meth:compare_and_swap. The SQLite path returns a fresh object on every call because json.loads always allocates new containers.

Source code in lazybridge/store/__init__.py
def read(self, key: str, default: Any = None) -> Any:
    """Return the value stored under ``key``, or ``default``.

    **Copy semantics (in-memory path)**: returns a deep copy of the
    stored value via :func:`_deep_copy_safe` so that callers mutating
    the returned object cannot silently alter the stored value and
    defeat :meth:`compare_and_swap`.  The SQLite path returns a fresh
    object on every call because ``json.loads`` always allocates new
    containers.
    """
    if self._db:
        row = self._conn().execute("SELECT value FROM store WHERE key=?", (key,)).fetchone()
        return json.loads(row["value"]) if row else default
    with self._lock:
        entry = self._mem.get(key)
        return _deep_copy_safe(entry.value) if entry else default

items

items(*, prefix: str | None = None) -> list[tuple[str, Any]]

Return (key, value) pairs, optionally restricted to keys starting with prefix.

Uses an indexed B-tree range scan on the SQLite path (a single WHERE key >= ? AND key < ? query) — sub-linear in the total keyspace. The in-memory path filters under the store lock.

Parameters

prefix: Optional key prefix to filter by. None (default) or "" returns every pair in the store. Any other string restricts the results to keys starting with that prefix.

Source code in lazybridge/store/__init__.py
def items(self, *, prefix: str | None = None) -> list[tuple[str, Any]]:
    """Return ``(key, value)`` pairs, optionally restricted to keys starting
    with ``prefix``.

    Uses an indexed B-tree range scan on the SQLite path (a single
    ``WHERE key >= ? AND key < ?`` query) — sub-linear in the total
    keyspace.  The in-memory path filters under the store lock.

    Parameters
    ----------
    prefix:
        Optional key prefix to filter by. ``None`` (default) or ``""``
        returns every pair in the store. Any other string restricts the
        results to keys starting with that prefix.
    """
    if self._db:
        if not prefix:
            rows = self._conn().execute("SELECT key, value FROM store").fetchall()
        else:
            upper = _prefix_upper_bound(prefix)
            if upper is not None:
                rows = (
                    self._conn()
                    .execute(
                        "SELECT key, value FROM store WHERE key >= ? AND key < ?",
                        (prefix, upper),
                    )
                    .fetchall()
                )
            else:
                # All characters in the prefix are U+10FFFF — no finite upper
                # bound; fall back to a full scan with Python startswith filter.
                rows = [
                    r
                    for r in self._conn().execute("SELECT key, value FROM store").fetchall()
                    if r["key"].startswith(prefix)
                ]
        return [(r["key"], json.loads(r["value"])) for r in rows]
    with self._lock:
        return [(k, _deep_copy_safe(v.value)) for k, v in self._mem.items() if not prefix or k.startswith(prefix)]

compare_and_swap

compare_and_swap(key: str, expected: Any, new: Any, *, agent_id: str | None = None) -> bool

Atomically set key to new only if its current value deep-equals expected.

expected=None means "the key must not currently exist" (a missing key compares equal to None).

Returns True on success, False when another writer has moved the value since the caller's last read — the caller is expected to treat this as a lost race (raise, back off, re-read, etc.). Comparison uses the JSON-normalised shape (via :func:_to_jsonable), so Pydantic models / nested dicts compare the same as they do on disk and survive SQLite round-trips.

SQLite path: wraps the read-check-write in BEGIN IMMEDIATE so concurrent writers serialise on the SQLite reserved lock instead of interleaving. Exceptions are caught as (sqlite3.Error, ValueError) — the ValueError arm covers json.JSONDecodeError (a subclass) raised by json.loads on corrupt rows; without it, a corrupt row would leave the BEGIN IMMEDIATE transaction open on the thread-local connection and poison every subsequent call on that thread.

Source code in lazybridge/store/__init__.py
def compare_and_swap(
    self,
    key: str,
    expected: Any,
    new: Any,
    *,
    agent_id: str | None = None,
) -> bool:
    """Atomically set ``key`` to ``new`` only if its current value
    deep-equals ``expected``.

    ``expected=None`` means "the key must not currently exist" (a
    missing key compares equal to ``None``).

    Returns ``True`` on success, ``False`` when another writer has
    moved the value since the caller's last read — the caller is
    expected to treat this as a lost race (raise, back off, re-read,
    etc.).  Comparison uses the JSON-normalised shape (via
    :func:`_to_jsonable`), so Pydantic models / nested dicts compare
    the same as they do on disk and survive SQLite round-trips.

    SQLite path: wraps the read-check-write in ``BEGIN IMMEDIATE``
    so concurrent writers serialise on the SQLite reserved lock
    instead of interleaving.  Exceptions are caught as
    ``(sqlite3.Error, ValueError)`` — the ``ValueError`` arm covers
    ``json.JSONDecodeError`` (a subclass) raised by ``json.loads``
    on corrupt rows; without it, a corrupt row would leave the
    ``BEGIN IMMEDIATE`` transaction open on the thread-local
    connection and poison every subsequent call on that thread.
    """
    if self._closed:
        raise RuntimeError("Store is closed")
    if not self._db:
        with self._lock:
            entry = self._mem.get(key)
            cur = entry.value if entry else None
            if not _json_eq(cur, expected):
                return False
            self._mem[key] = StoreEntry(key=key, value=_deep_copy_safe(new), agent_id=agent_id)
            return True

    # SQLite path — serialise concurrent writers via reserved lock.
    conn = self._conn()
    serialised_new = json.dumps(_to_jsonable(new), default=str)
    with self._lock:
        try:
            conn.execute("BEGIN IMMEDIATE")
            row = conn.execute("SELECT value FROM store WHERE key=?", (key,)).fetchone()
            cur = json.loads(row["value"]) if row else None
            if not _json_eq(cur, expected):
                conn.execute("ROLLBACK")
                return False
            conn.execute(
                "INSERT OR REPLACE INTO store (key, value, written_at, agent_id) VALUES (?,?,?,?)",
                (key, serialised_new, time.time(), agent_id),
            )
            conn.commit()
            return True
        except (sqlite3.Error, ValueError):
            # Catch both sqlite3.Error (transaction/IO failures) and
            # ValueError/JSONDecodeError (corrupt JSON in the row).
            # Without the ValueError arm, a corrupt row causes
            # json.loads to raise inside the BEGIN IMMEDIATE block,
            # leaving the transaction open on the thread-local
            # connection and poisoning every subsequent call on that thread.
            try:
                conn.execute("ROLLBACK")
            except sqlite3.Error as rollback_exc:
                import warnings as _warnings

                _warnings.warn(
                    f"Store.compare_and_swap: ROLLBACK after error failed "
                    f"({type(rollback_exc).__name__}: {rollback_exc}).  "
                    f"Discarding the thread-local connection so the next "
                    f"call gets a fresh one.",
                    UserWarning,
                    stacklevel=3,
                )
                self._discard_thread_conn()
            raise

At-rest encryption

EncryptedStoreAdapter wraps any Store and encrypts values before they hit the inner store; reads decrypt transparently. Keys are not encrypted — Store treats keys as opaque routing strings, and encrypting them would break iteration / __contains__.

Install the optional extra:

pip install 'lazybridge[encryption]'

The extra pulls cryptography; the default install stays pure-Python.

from cryptography.fernet import Fernet
from lazybridge.store import Store
from lazybridge.store.encryption import EncryptedStoreAdapter

key = Fernet.generate_key()  # persist somewhere safe (KMS, env var, etc.)
store = EncryptedStoreAdapter(Store(db="state.sqlite"), key=key)

store.write("agent.notes", {"draft": "secret thoughts"})
# SQLite row holds an `lb-enc-v1::` Fernet token, not the JSON.

store.read("agent.notes")
# {"draft": "secret thoughts"}

For key rotation use MultiFernet semantics by passing a list — the first key is used for new writes, every key is tried on read:

store = EncryptedStoreAdapter(Store(...), key=[new_key, old_key])

lazybridge.store.encryption.EncryptedStoreAdapter

EncryptedStoreAdapter(inner: Store, *, key: bytes | list[bytes])

Encrypt values written to an inner :class:Store.

Parameters

inner: The Store to wrap. Any Store flavour works (in-memory or SQLite-backed); the adapter doesn't care about persistence. key: A Fernet key (bytes) or list of keys. A list activates :class:cryptography.fernet.MultiFernet for rotation — encryption uses the first key, decryption tries each in order until one succeeds.

Raises

ImportError: If cryptography isn't installed. Install lazybridge[encryption] to add it. ValueError: If key is empty or not bytes / list of bytes.

Source code in lazybridge/store/encryption.py
def __init__(self, inner: Store, *, key: bytes | list[bytes]) -> None:
    try:
        from cryptography.fernet import Fernet, MultiFernet
    except ImportError as e:
        raise ImportError(
            "EncryptedStoreAdapter requires the 'cryptography' package.\n"
            "  Install it via the encryption extra:\n"
            "    pip install 'lazybridge[encryption]'\n"
            "  Or directly:\n"
            "    pip install cryptography"
        ) from e

    if isinstance(key, bytes):
        self._fernet: Fernet | MultiFernet = Fernet(key)
    elif isinstance(key, list):
        if not key:
            raise ValueError("EncryptedStoreAdapter: key list is empty.")
        if not all(isinstance(k, bytes) for k in key):
            raise ValueError(
                f"EncryptedStoreAdapter: every key in the rotation list must be bytes; "
                f"got types {[type(k).__name__ for k in key]}."
            )
        self._fernet = MultiFernet([Fernet(k) for k in key])
    else:
        raise ValueError(f"EncryptedStoreAdapter: key must be bytes or list[bytes], got {type(key).__name__}.")

    self._inner = inner

items

items(*, prefix: str | None = None) -> list[tuple[str, Any]]

Return (key, decrypted-value) pairs, optionally restricted to keys starting with prefix.

Source code in lazybridge/store/encryption.py
def items(self, *, prefix: str | None = None) -> list[tuple[str, Any]]:
    """Return ``(key, decrypted-value)`` pairs, optionally restricted to
    keys starting with ``prefix``."""
    return [(k, self._decrypt(v)) for k, v in self._inner.items(prefix=prefix)]

compare_and_swap

compare_and_swap(key: str, expected: Any, new: Any, *, agent_id: str | None = None) -> bool

CAS over the decrypted value.

Fernet ciphertexts embed a nonce, so two encryptions of the same plaintext are never equal. Delegating to the inner Store's CAS would therefore always fail. We decrypt the current ciphertext, compare against expected in plaintext space, then write the new encrypted value — as long as the ciphertext hasn't changed since the read.

This double-check is what makes the adapter race-safe: another writer may have updated the value between our decrypt and the inner CAS attempt, in which case the inner CAS returns False and we propagate that.

Source code in lazybridge/store/encryption.py
def compare_and_swap(
    self,
    key: str,
    expected: Any,
    new: Any,
    *,
    agent_id: str | None = None,
) -> bool:
    """CAS over the decrypted value.

    Fernet ciphertexts embed a nonce, so two encryptions of the
    same plaintext are never equal.  Delegating to the inner
    Store's CAS would therefore always fail.  We decrypt the
    current ciphertext, compare against ``expected`` in plaintext
    space, then write the new encrypted value — *as long as* the
    ciphertext hasn't changed since the read.

    This double-check is what makes the adapter race-safe: another
    writer may have updated the value between our decrypt and the
    inner CAS attempt, in which case the inner CAS returns
    ``False`` and we propagate that.
    """
    current_token = self._inner.read(key)
    if current_token is None:
        current_plain = None
    else:
        current_plain = self._decrypt(current_token)

    if not _plain_eq(current_plain, expected):
        return False

    return self._inner.compare_and_swap(
        key,
        expected=current_token,
        new=self._encrypt(new),
        agent_id=agent_id,
    )