Streaming
Consume Server-Sent Events from chat and pentest execution, follow agent activity, and attach files to a streaming request.
Real-time work in Rank is delivered over Server-Sent Events (SSE). Two methods open a stream:
client.ai.chat.stream(...)— send a prompt to an agent, or launch a pentest, and stream the response as it is produced.client.pentests.stream(pentest_id)— reconnect to a pentest that is already running.
Both return a Stream on the synchronous client and an AsyncStream on the asynchronous one.
The object is both a context manager and an iterator of ServerSentEvent, and it closes itself
automatically when a terminal event (complete, error or cancelled) arrives.
Sending a chat request
client.ai.chat.stream has a single required argument, user_prompt. The remaining arguments
select the flow:
- General chat — pass
agent_id(and optionallychat_idto persist context). - Pentest, automatic — pass
pentest_idwithmode="automatic". - Pentest, guided — pass
pentest_idwithmode="guided"andphase_id.
| Argument | Type | Notes |
|---|---|---|
user_prompt | str | The message to send. Required. |
agent_id | int | Agent to use (required for general chat). |
agent_type | "general" | "pentest" | Inferred automatically when pentest_id is set. |
chat_id | int | Persists messages and conversation context across modes. |
pentest_id | int | Required for pentest modes. |
mode | "guided" | "automatic" | Pentest only. |
phase_id | int | Required for guided mode. |
context | list | Explicit conversation context (ChatMessage objects or dicts). |
file / files | attachment(s) | A single attachment, or a list. Mutually exclusive. |
Sync vs. async iteration
The synchronous client uses a plain with / for. The asynchronous client requires you to
await the stream(...) call first, then use async with / async for.
import rank
with rank.Rank() as client:
with client.ai.chat.stream(
agent_id=23,
user_prompt="Summarize the latest critical Apache CVEs disclosed this month.",
chat_id=14,
) as stream:
for event in stream:
if event.type == "content":
print(event.content, end="", flush=True)
elif event.type == "complete":
print() import asyncio
import rank
async def main():
async with rank.AsyncRank() as client:
async with await client.ai.chat.stream(
agent_id=23,
user_prompt="Summarize the latest critical Apache CVEs disclosed this month.",
chat_id=14,
) as stream:
async for event in stream:
if event.type == "content":
print(event.content, end="", flush=True)
elif event.type == "complete":
print()
asyncio.run(main()) The ServerSentEvent shape
Every iteration yields a ServerSentEvent. These are the properties you read:
| Property | Type | Description |
|---|---|---|
event.type | str | Event type identifier (content, complete, agent_event, …). |
event.content | str | Text content of the event. |
event.error | str | None | Error message (only for error events). |
event.metadata | dict | Extra fields: progress %, vulnerability counts, elapsed time, etc. |
event.timestamp | float | None | Event timestamp. |
event.raw_data | str | Raw JSON string before parsing. |
event.is_agent_event | bool | True when the event carries an AgentEvent payload. |
event.agent_event | AgentEvent | None | Parsed AgentEvent (only when type == "agent_event"). |
event.event_type | str | None | Shortcut to agent_event.event_type. |
Common transport event types you will see, depending on the flow: content, queued, ready,
phase_start, phase_complete, phase_retry, processing_vulnerabilities,
vulnerabilities_complete, agent_event, complete, error, cancelled and reconnected.
The golden rule: content vs. agent_event
This is the single most important thing to get right when rendering a stream:
Only
event.type == "content"builds the assistant’s answer. Events withevent.type == "agent_event"describe what the agent is doing — thinking, calling tools, spawning sub-agents — and are meant for an activity / timeline view. Never concatenateagent_eventtext into the answer body.
If you only care about the final answer, handle content and ignore everything else. To also
render the live activity, branch on event.is_agent_event and read the typed payload from
event.agent_event.
import rank
from rank import AgentEvent
with rank.Rank() as client:
with client.ai.chat.stream(
agent_id=23,
user_prompt="Investigate this target end-to-end and summarize the exposure.",
chat_id=14,
) as stream:
for event in stream:
if event.type == "content":
# The actual answer — only this builds the message body.
print(event.content, end="", flush=True)
elif event.is_agent_event:
ev = event.agent_event
indent = " " if ev.depth > 0 else "" # sub-agents are nested
if ev.event_type == AgentEvent.THINKING:
print(f"\n{indent}[thinking] {ev.data['content'][:80]}")
elif ev.event_type == AgentEvent.TOOL_CALL:
print(f"\n{indent}[tool] {ev.data['tool_name']}")
elif ev.event_type == AgentEvent.AGENT_FINISHED:
print(f"\n{indent}[done] {ev.data.get('stop_reason')}")
elif event.type == "complete":
print("\n[stream finished]") import asyncio
import rank
from rank import AgentEvent
async def main():
async with rank.AsyncRank() as client:
async with await client.ai.chat.stream(
agent_id=23,
user_prompt="Investigate this target end-to-end and summarize the exposure.",
chat_id=14,
) as stream:
async for event in stream:
if event.type == "content":
print(event.content, end="", flush=True)
elif event.is_agent_event:
ev = event.agent_event
indent = " " if ev.depth > 0 else ""
if ev.event_type == AgentEvent.TOOL_CALL:
print(f"\n{indent}[tool] {ev.data['tool_name']}")
elif ev.event_type == AgentEvent.AGENT_FINISHED:
print(f"\n{indent}[done] {ev.data.get('stop_reason')}")
elif event.type == "complete":
print("\n[stream finished]")
asyncio.run(main()) The AgentEvent payload
When event.is_agent_event is True, event.agent_event returns a typed AgentEvent:
| Field | Type | Description |
|---|---|---|
event_type | str | Sub-event identifier (e.g. thinking, tool_call, agent_finished). |
agent_id | int | str | None | Numeric agent ID, "orchestrator", or None. |
parent_agent_id | int | None | Parent agent ID for sub-agents. |
instance_id | str | Unique per-execution key — group an instance’s events by this. |
depth | int | 0 = top-level agent, 1 = sub-agent. |
iteration | int | Current iteration of the agent loop. |
timestamp | float | Unix timestamp. |
data | dict | Event-specific payload (read the keys relevant to event_type). |
Every sub-event name is available as a constant on the class — AgentEvent.TOOL_CALL,
AgentEvent.THINKING, AgentEvent.SUBAGENT_SPAWN, AgentEvent.AGENT_FINISHED, and so on — so
you can compare against constants instead of raw strings. Sub-agent text arrives as
agent_event with event_type == "text_chunk" and depth > 0: it is activity, not the answer.
Full agent_event reference
The harness emits one agent_event per step. You only need to handle the ones your UI cares
about; unknown event_type values are safe to ignore. For the mechanics behind these events,
see Agent harness & long-running tasks.
Agent loop (a single agent working through its ReAct loop):
event_type | Constant | Key data fields |
|---|---|---|
agent_start | AGENT_START | model, agent_id, mission, max_iterations, tools |
plan | PLAN | plan_text |
iteration_start | ITERATION_START | iteration, max_iterations, tokens_used, stagnation |
thinking | THINKING | content, streaming |
tool_call | TOOL_CALL | tool_name, tool_args |
tool_result | TOOL_RESULT | tool_name, result, duration_ms, skipped, cached, blocked |
interpretation | INTERPRETATION | content |
context_compaction | CONTEXT_COMPACTION | summarized_steps, summary_chars |
shared_context | SHARED_CONTEXT | summary_length, summary_preview |
nudge | NUDGE | nudge_count, unused_tools |
progress | PROGRESS | iteration, tools_used, tokens_used |
iteration_complete | ITERATION_COMPLETE | iteration_prompt, iteration_response, token counts |
text_chunk | TEXT_CHUNK | content (sub-agents only, depth > 0) |
subagent_spawn | SUBAGENT_SPAWN | subagent_id, mission, depth |
subagent_complete | SUBAGENT_COMPLETE | subagent_id, result_preview, iterations |
agent_finished | AGENT_FINISHED | stop_reason, iterations, input_tokens, output_tokens, elapsed_s |
Orchestration (automatic pentests, where agent_id is "orchestrator"):
event_type | Constant | Key data fields |
|---|---|---|
orchestration_start | ORCHESTRATION_START | agents, phase, total_agents |
orchestration_status | ORCHESTRATION_STATUS | alive_agents, completed_agents, elapsed_time |
agent_status_change | AGENT_STATUS_CHANGE | agent_id, status, reasoning |
consolidation_start | CONSOLIDATION_START | agents_completed |
consolidation_heartbeat | CONSOLIDATION_HEARTBEAT | elapsed_seconds, message |
consolidation_complete | CONSOLIDATION_COMPLETE | input_tokens, output_tokens |
orchestration_complete | ORCHESTRATION_COMPLETE | phase, total_agents, successful, failed |
orchestration_cancelled | ORCHESTRATION_CANCELLED | message |
Browser agent (interactive web navigation): browser_agent_start (BROWSER_AGENT_START) with
mission, target_url, max_iterations. Its tool_call/tool_result events mask credentials.
The stop_reason carried by agent_finished is one of goal_reached, max_iterations,
timeout, budget, stagnation or cancelled, each available as a constant
(AgentEvent.STOP_GOAL_REACHED, AgentEvent.STOP_MAX_ITERATIONS, and so on).
Launching and reconnecting to a pentest
A pentest stream carries the same content and agent_event items plus pentest-specific
transport events such as phase_start, phase_complete and vulnerabilities_complete. Launch
one with client.ai.chat.stream(..., pentest_id=...), and reattach to a running one with
client.pentests.stream(pentest_id).
import rank
with rank.Rank() as client:
with client.pentests.stream(pentest_id=42) as stream:
for event in stream:
if event.type == "content":
print(event.content, end="", flush=True)
elif event.is_agent_event:
ev = event.agent_event
print(f"\n{ev.event_type}: {ev.data}")
elif event.type == "phase_complete":
print(f"\n[phase {event.metadata.get('phase_id')} done]")
elif event.type == "complete":
print("\nPentest finished!") import asyncio
import rank
async def main():
async with rank.AsyncRank() as client:
async with await client.pentests.stream(pentest_id=42) as stream:
async for event in stream:
if event.type == "content":
print(event.content, end="", flush=True)
elif event.is_agent_event:
ev = event.agent_event
print(f"\n{ev.event_type}: {ev.data}")
elif event.type == "complete":
print("\nPentest finished!")
asyncio.run(main()) Attachments
Pass a single attachment with file or several with files (the two are mutually exclusive).
Each attachment can be a file-like object, raw bytes, a (filename, content) tuple, or a
(filename, content, content_type) tuple. Supported extensions are PDF, JSON, PNG, JPEG, WEBP
and GIF, with a limit of 10 files, 30 MB per file and 50 MB combined per request.
import rank
with rank.Rank() as client:
# Single attachment
with open("report.pdf", "rb") as f:
with client.ai.chat.stream(
agent_id=23,
user_prompt="Analyze this security report and summarize the findings",
file=f,
) as stream:
for event in stream:
if event.type == "content":
print(event.content, end="", flush=True)
# Multiple attachments
with open("report.pdf", "rb") as a, open("diagram.png", "rb") as b:
with client.ai.chat.stream(
agent_id=23,
user_prompt="Cross-reference these files and summarize the findings",
files=[a, b],
) as stream:
for event in stream:
if event.type == "content":
print(event.content, end="", flush=True) Once you can read a stream, see Errors & pagination for handling failures, or the Cookbook for a full streaming UI recipe.