Stream agent events in real time

Split one SSE stream into the assistant answer and a live timeline of agent activity.

What you’ll build

A consumer for the Rank Server-Sent Events stream that produces two distinct surfaces from a single connection: the assistant answer and a live activity timeline. This is the foundation behind every streaming UI in the platform, and it works the same way for general chat and for pentest execution.

The golden rule

A stream interleaves two kinds of messages:

  • content events carry the assistant’s answer, token by token. Concatenate these, in order, and nothing else.
  • agent_event messages describe what the agent is doing — thinking, calling tools, spawning sub-agents. Render them as a timeline. Never append them to the answer.

Sub-agent text is the classic trap: it arrives as an agent_event (text_chunk, depth > 0) and must stay in the timeline. Only the main agent’s content is the answer.

The ServerSentEvent shape

Each message is exposed by the SDK as a ServerSentEvent with these helpers:

MemberDescription
event.type"content", "agent_event", "complete", "error", "cancelled", "phase_start", …
event.contentThe text fragment, for content events.
event.errorThe error string, for error events.
event.metadataThe extra data object (phase info, totals, …).
event.is_agent_eventTrue when the message wraps an AgentEvent.
event.agent_eventThe typed AgentEvent (event_type, agent_id, depth, data).

An AgentEvent exposes event_type (e.g. AgentEvent.TOOL_CALL), agent_id, depth (0 for the main agent, 1 for a sub-agent), instance_id for grouping, and an event-specific data dict.

Steps

  1. Open a stream. A general agent runs a long-form loop and emits both content and agent_event messages.

    stream = client.ai.chat.stream(
        agent_id=agent_id,
        agent_type="general",
        user_prompt="Research recent critical CVEs for nginx and summarize them.",
    )
  2. Route each event. Append content to the answer; render everything else as activity.

    answer_parts = []
    with stream as events:
        for event in events:
            if event.type == "content":
                answer_parts.append(event.content)
            elif event.is_agent_event:
                ev = event.agent_event
                print("  " * ev.depth, "[activity]", ev.event_type)
            elif event.type == "error":
                print("error:", event.error)
  3. Read the typed payload. Switch on event_type to render a meaningful line per event.

    ev = event.agent_event
    if ev.event_type == rank.AgentEvent.TOOL_CALL:
        print("->", ev.data.get("tool_name"), ev.data.get("tool_args"))
    elif ev.event_type == rank.AgentEvent.AGENT_FINISHED:
        print("done:", ev.data.get("stop_reason"))
  4. Finalize. The complete event closes the stream; assemble the answer from the collected content fragments.

    print("".join(answer_parts))

Run it

Save the following as streaming_agent_events.py, set RANK_API_KEY, then run python streaming_agent_events.py.

"""Consume the Rank SSE stream and render an agent-activity timeline.

This is a focused deep-dive on the streaming protocol. It talks to a general
agent (`agent_type="general"`) running a long-form ReAct loop and demonstrates
how to split a single stream into two distinct surfaces:

  * The ANSWER  -- the concatenation of every `content` event, in order.
  * The TIMELINE -- one line per `agent_event` (thinking, tool calls,
    subagents, ...), which describes what the agent is doing but is NEVER
    part of the answer.

Each Server-Sent Event is exposed by the SDK as a `ServerSentEvent` with these
helpers:

    event.type            -> "content" | "agent_event" | "complete" | ...
    event.content         -> the text fragment for "content" events
    event.error           -> the error string for "error" events
    event.metadata        -> the extra "data" object (phase info, totals, ...)
    event.is_agent_event  -> True when the message carries an AgentEvent
    event.agent_event     -> the typed AgentEvent (event_type, agent_id, data)

Run:
    pip install rank-sdk
    export RANK_API_KEY=rk_...
    python streaming_agent_events.py

Optional environment variables:
    RANK_AGENT_ID  ID of the general agent to use. If unset, the first
                   available general agent is selected automatically.
    RANK_PROMPT    Prompt to send (default: a CVE research question).
"""

from __future__ import annotations

import os

import rank

PROMPT = os.environ.get(
    "RANK_PROMPT",
    "Research the most relevant recent critical CVEs for nginx and summarize them.",
)

# Indentation per nesting depth so sub-agent activity reads as a tree.
INDENT = "    "


def resolve_agent_id(client: rank.Rank) -> int:
    """Return the configured general agent, or the first one available."""
    configured = os.environ.get("RANK_AGENT_ID")
    if configured:
        return int(configured)

    agents = client.agents.list(type="general")
    if not agents.items:
        raise SystemExit("No general agents available on this account.")
    return agents.items[0].id


def format_activity(ev: rank.AgentEvent) -> str:
    """Render a single agent event as one timeline line."""
    data = ev.data
    if ev.event_type == rank.AgentEvent.AGENT_START:
        return f"agent #{ev.agent_id} started ({data.get('model', '?')})"
    if ev.event_type == rank.AgentEvent.ITERATION_START:
        return f"iteration {data.get('iteration')}/{data.get('max_iterations')}"
    if ev.event_type == rank.AgentEvent.THINKING:
        return "thinking..."
    if ev.event_type == rank.AgentEvent.TOOL_CALL:
        return f"-> tool {data.get('tool_name')} {data.get('tool_args', {})}"
    if ev.event_type == rank.AgentEvent.TOOL_RESULT:
        return (f"<- tool {data.get('tool_name')} "
                f"({data.get('duration_ms', 0)}ms)")
    if ev.event_type == rank.AgentEvent.SUBAGENT_SPAWN:
        return f"spawned subagent {data.get('subagent_id')}: {data.get('mission', '')}"
    if ev.event_type == rank.AgentEvent.SUBAGENT_COMPLETE:
        return f"subagent {data.get('subagent_id')} finished"
    if ev.event_type == rank.AgentEvent.AGENT_FINISHED:
        return (f"agent #{ev.agent_id} finished "
                f"({data.get('stop_reason')}, {data.get('iterations')} iter)")
    return ev.event_type or "event"


def main() -> None:
    with rank.Rank() as client:
        agent_id = resolve_agent_id(client)
        print(f"Streaming from general agent #{agent_id}\n")

        answer_parts: list[str] = []

        with client.ai.chat.stream(
            agent_id=agent_id,
            agent_type="general",
            user_prompt=PROMPT,
        ) as stream:
            for event in stream:
                if event.type == "content":
                    # Build the answer from content events only.
                    answer_parts.append(event.content)

                elif event.is_agent_event:
                    ev = event.agent_event
                    if ev is None:
                        continue
                    # Sub-agent events (depth > 0) are indented under the parent.
                    prefix = INDENT * ev.depth
                    print(f"  [activity] {prefix}{format_activity(ev)}")

                elif event.type == "error":
                    print(f"  [error] {event.error}")

                elif event.type == "complete":
                    print("  [stream complete]")

        print("\n===== ASSISTANT ANSWER =====\n")
        print("".join(answer_parts).strip() or "(no answer received)")


if __name__ == "__main__":
    try:
        main()
    except rank.AuthenticationError:
        print("ERROR: invalid or missing API key. Set RANK_API_KEY.")
    except rank.APIError as exc:
        print(f"API error ({exc.status_code}): {exc.message}")

The same routing logic powers the pentest recipes — see Run your first automated pentest and the Streaming guide for the full event catalog.