MACP

Session Discovery

From SDK 0.3.0 the SDK wraps the runtime's ListSessions and WatchSessions RPCs. Together they let orchestrators and supervisor agents enumerate active sessions and react to CREATED / RESOLVED / EXPIRED lifecycle events without polling GetSession.

For the underlying RPC contracts (request/response shapes, scoping rules), see Runtime API § Discovery and § Streaming Watches.

When to use

  • Supervisor dashboards — show every OPEN session for a tenant and their terminal outcomes as they occur.
  • Late-joining orchestrators — recover in-flight sessions after a restart without keeping an out-of-band session registry.
  • Reconciliation — cross-check your control-plane's view of sessions against what the runtime actually accepted.

list_sessions (snapshot)

from macp_sdk import AuthConfig, MacpClient

client = MacpClient(
    target="runtime:50051",
    auth=AuthConfig.for_bearer("tok-ops", expected_sender="ops"),
)
client.initialize()

for meta in client.list_sessions():
    print(
        meta.session_id,
        meta.mode,
        meta.state,
        meta.context_id or "-",
        list(meta.extension_keys),
    )

Each entry is a SessionMetadata proto with the same shape returned by GetSession — including the projected context_id and extension_keys fields that surface any extension blobs the initiator attached to SessionStart.extensions (see Protocol → SessionStart).

SessionLifecycleWatcher (live stream)

from macp_sdk import SessionLifecycleWatcher

watcher = SessionLifecycleWatcher(client)

for event in watcher.changes():
    print(event.event_type, event.session.session_id)
    if event.is_terminal:
        # This session will not emit more events.
        ...

event.event_type is a short string ("CREATED", "RESOLVED", "EXPIRED"). Convenience predicates:

PredicateTrue for
event.is_createdCREATED
event.is_resolvedRESOLVED (Commitment accepted)
event.is_expiredEXPIRED (TTL / CancelSession)
event.is_terminalRESOLVED or EXPIRED

Startup snapshot semantics

The runtime emits an initial CREATED event for every session that is already OPEN at subscribe time, then live events thereafter. That means a freshly-started supervisor sees every in-flight session without a separate list_sessions() call:

for event in SessionLifecycleWatcher(client).changes():
    if event.is_created:
        register(event.session)   # fires once per pre-existing session, plus every new one
    elif event.is_terminal:
        finalise(event.session)

list_sessions() is still useful when you want a bounded snapshot without holding the stream open.

Blocking handler form

watch(handler) is shorthand for a blocking for-loop:

def on_event(ev):
    dashboard.update(ev.session.session_id, ev.event_type)

SessionLifecycleWatcher(client).watch(on_event)  # blocks

Threading

The watcher reads from a gRPC server-streaming RPC on the caller's thread. Run it in a background thread or process if your agent also needs to send envelopes on the same client:

import threading

def run_watcher():
    for ev in SessionLifecycleWatcher(client).changes():
        handle(ev)

threading.Thread(target=run_watcher, daemon=True).start()

Authorisation

Both RPCs require the same Bearer auth as any other SDK call — the runtime scopes results to the authenticated identity. An agent only sees sessions it is a participant in, plus any sessions its token is authorised to observe via runtime config.