Stream agent events in real time
Split one SSE stream into the assistant answer and a live timeline of agent activity.
What you’ll build
A consumer for the Rank Server-Sent Events stream that produces two distinct surfaces from a single connection: the assistant answer and a live activity timeline. This is the foundation behind every streaming UI in the platform, and it works the same way for general chat and for pentest execution.
The golden rule
A stream interleaves two kinds of messages:
contentevents carry the assistant’s answer, token by token. Concatenate these, in order, and nothing else.agent_eventmessages describe what the agent is doing — thinking, calling tools, spawning sub-agents. Render them as a timeline. Never append them to the answer.
Sub-agent text is the classic trap: it arrives as an agent_event (text_chunk, depth > 0)
and must stay in the timeline. Only the main agent’s content is the answer.
The ServerSentEvent shape
Each message is exposed by the SDK as a ServerSentEvent with these helpers:
| Member | Description |
|---|---|
event.type | "content", "agent_event", "complete", "error", "cancelled", "phase_start", … |
event.content | The text fragment, for content events. |
event.error | The error string, for error events. |
event.metadata | The extra data object (phase info, totals, …). |
event.is_agent_event | True when the message wraps an AgentEvent. |
event.agent_event | The typed AgentEvent (event_type, agent_id, depth, data). |
An AgentEvent exposes event_type (e.g. AgentEvent.TOOL_CALL), agent_id, depth
(0 for the main agent, 1 for a sub-agent), instance_id for grouping, and an
event-specific data dict.
Steps
-
Open a stream. A general agent runs a long-form loop and emits both
contentandagent_eventmessages.stream = client.ai.chat.stream( agent_id=agent_id, agent_type="general", user_prompt="Research recent critical CVEs for nginx and summarize them.", ) -
Route each event. Append
contentto the answer; render everything else as activity.answer_parts = [] with stream as events: for event in events: if event.type == "content": answer_parts.append(event.content) elif event.is_agent_event: ev = event.agent_event print(" " * ev.depth, "[activity]", ev.event_type) elif event.type == "error": print("error:", event.error) -
Read the typed payload. Switch on
event_typeto render a meaningful line per event.ev = event.agent_event if ev.event_type == rank.AgentEvent.TOOL_CALL: print("->", ev.data.get("tool_name"), ev.data.get("tool_args")) elif ev.event_type == rank.AgentEvent.AGENT_FINISHED: print("done:", ev.data.get("stop_reason")) -
Finalize. The
completeevent closes the stream; assemble the answer from the collectedcontentfragments.print("".join(answer_parts))
Run it
Save the following as streaming_agent_events.py, set RANK_API_KEY, then run
python streaming_agent_events.py.
"""Consume the Rank SSE stream and render an agent-activity timeline.
This is a focused deep-dive on the streaming protocol. It talks to a general
agent (`agent_type="general"`) running a long-form ReAct loop and demonstrates
how to split a single stream into two distinct surfaces:
* The ANSWER -- the concatenation of every `content` event, in order.
* The TIMELINE -- one line per `agent_event` (thinking, tool calls,
subagents, ...), which describes what the agent is doing but is NEVER
part of the answer.
Each Server-Sent Event is exposed by the SDK as a `ServerSentEvent` with these
helpers:
event.type -> "content" | "agent_event" | "complete" | ...
event.content -> the text fragment for "content" events
event.error -> the error string for "error" events
event.metadata -> the extra "data" object (phase info, totals, ...)
event.is_agent_event -> True when the message carries an AgentEvent
event.agent_event -> the typed AgentEvent (event_type, agent_id, data)
Run:
pip install rank-sdk
export RANK_API_KEY=rk_...
python streaming_agent_events.py
Optional environment variables:
RANK_AGENT_ID ID of the general agent to use. If unset, the first
available general agent is selected automatically.
RANK_PROMPT Prompt to send (default: a CVE research question).
"""
from __future__ import annotations
import os
import rank
PROMPT = os.environ.get(
"RANK_PROMPT",
"Research the most relevant recent critical CVEs for nginx and summarize them.",
)
# Indentation per nesting depth so sub-agent activity reads as a tree.
INDENT = " "
def resolve_agent_id(client: rank.Rank) -> int:
"""Return the configured general agent, or the first one available."""
configured = os.environ.get("RANK_AGENT_ID")
if configured:
return int(configured)
agents = client.agents.list(type="general")
if not agents.items:
raise SystemExit("No general agents available on this account.")
return agents.items[0].id
def format_activity(ev: rank.AgentEvent) -> str:
"""Render a single agent event as one timeline line."""
data = ev.data
if ev.event_type == rank.AgentEvent.AGENT_START:
return f"agent #{ev.agent_id} started ({data.get('model', '?')})"
if ev.event_type == rank.AgentEvent.ITERATION_START:
return f"iteration {data.get('iteration')}/{data.get('max_iterations')}"
if ev.event_type == rank.AgentEvent.THINKING:
return "thinking..."
if ev.event_type == rank.AgentEvent.TOOL_CALL:
return f"-> tool {data.get('tool_name')} {data.get('tool_args', {})}"
if ev.event_type == rank.AgentEvent.TOOL_RESULT:
return (f"<- tool {data.get('tool_name')} "
f"({data.get('duration_ms', 0)}ms)")
if ev.event_type == rank.AgentEvent.SUBAGENT_SPAWN:
return f"spawned subagent {data.get('subagent_id')}: {data.get('mission', '')}"
if ev.event_type == rank.AgentEvent.SUBAGENT_COMPLETE:
return f"subagent {data.get('subagent_id')} finished"
if ev.event_type == rank.AgentEvent.AGENT_FINISHED:
return (f"agent #{ev.agent_id} finished "
f"({data.get('stop_reason')}, {data.get('iterations')} iter)")
return ev.event_type or "event"
def main() -> None:
with rank.Rank() as client:
agent_id = resolve_agent_id(client)
print(f"Streaming from general agent #{agent_id}\n")
answer_parts: list[str] = []
with client.ai.chat.stream(
agent_id=agent_id,
agent_type="general",
user_prompt=PROMPT,
) as stream:
for event in stream:
if event.type == "content":
# Build the answer from content events only.
answer_parts.append(event.content)
elif event.is_agent_event:
ev = event.agent_event
if ev is None:
continue
# Sub-agent events (depth > 0) are indented under the parent.
prefix = INDENT * ev.depth
print(f" [activity] {prefix}{format_activity(ev)}")
elif event.type == "error":
print(f" [error] {event.error}")
elif event.type == "complete":
print(" [stream complete]")
print("\n===== ASSISTANT ANSWER =====\n")
print("".join(answer_parts).strip() or "(no answer received)")
if __name__ == "__main__":
try:
main()
except rank.AuthenticationError:
print("ERROR: invalid or missing API key. Set RANK_API_KEY.")
except rank.APIError as exc:
print(f"API error ({exc.status_code}): {exc.message}")
The same routing logic powers the pentest recipes — see Run your first automated pentest and the Streaming guide for the full event catalog.