Chapter 10 of 14 · Part 3: Run Your Agent

Chapter 10: The Event Stream — Watching Your Agent Think and Act Live

By the end of this chapter, you will understand every event type in the Managed Agents event stream, know how to process them in your code, and be able to build a basic streaming event handler.


The Big Idea

When Claude is running a session, it isn't operating in a black box. Every thought, every action, every tool call is broadcast as a structured event. You can watch your agent reason in real time — see it decide to search the web, observe it reading a file, catch it the moment it finishes work.

This visibility is what makes agents debuggable, trustworthy, and steerable. Without the event stream, you'd hand Claude a task and wait for a result, with no way to understand what happened in between. With the event stream, you see the work as it unfolds.

The event stream documentation describes the model this way: "Communication with Claude Managed Agents is event-based. You send user events to the agent, and receive agent and session events back to track status."

Events follow a {domain}.{action} naming convention. agent.message is an event from the agent domain with the message action. session.status_idle is a session-domain event indicating idle status. Once you see the pattern, the entire event vocabulary becomes predictable.

Event Stream — Observable Agent Activity A vertical cascading stream of event cards flowing downward like a terminal log. Events include user.message, agent.tool_use bash, tool_result, agent.tool_use web_search, tool_result, and session.status_idle. Each card fades in and scrolls to show the full observable event stream. Event Stream Everything the agent does is observable — server-sent events over SSE session event stream user.message T+0.0s "Write a Python script to analyze sales data" → kicks off session execution agent.message T+1.2s "I'll start by reading the CSV file…" agent narration / plan agent.tool_use bash T+2.1s $ python analyze.py --input sales.csv waiting for container execution… tool_result T+3.8s Total: $847,293 | Top region: West Coast (+23%) execute(bash, …) → string returned agent.tool_use web_search T+4.5s query: "Q4 2025 retail industry benchmarks" searching the web… tool_result T+5.9s Industry avg growth: 18% — your +23% beats benchmark context enriched session.status_idle T+7.1s stop_reason: end_turn agent finished — awaiting next user.message YOU SEND user events YOU RECEIVE agent/session events Events persisted server-side · fetch full history with events.list()
Live event stream visualization. A vertical timeline of events flowing downward, each as a labeled box with a type indicator and a preview of content. Example sequence: user.message → agent.message (partial) → agent.tool_use [web_search] → agent.tool_use [read] → agent.message (complete) → session.status_idle (end_turn). Each box is a different color by domain: purple for user, blue for agent, green for session. Caption: "This is what your agent is actually doing."

The Analogy

Think of the event stream like a flight's live tracking map.

Before live tracking, you booked a flight and just waited — you had no idea where the plane was or whether it was on time. When it landed, you found out.

With live tracking, you can see: departed on time, cruising altitude 35,000 feet, 40 minutes from destination, now in final descent. You know what's happening at every moment. If there's a delay, you see it before the gate agent announces it.

The event stream is live tracking for your agent. You can see: started the task, searched the web, found three relevant sources, is now reading the first one, is composing the summary. You know where the work is, what's going well, and — if something goes wrong — exactly where it went wrong.

DiagramFlight tracker metaphor. Top: stylized flight path with labeled waypoints matching agent events — "Took off" (user.message), "Cruising" (agent.tool_use series), "Approach" (agent.message), "Landed" (session.status_idle). Below the flight path: the equivalent event stream with real event type labels.

How It Actually Works

The Two Categories of Events

Events are bidirectional. You send events to the session; the session sends events back to you.

Events you send (user events):

Event type When you send it
user.message Start or continue a task
user.tool_confirmation Approve or deny a pending tool call
user.custom_tool_result Return the result of a custom tool your code executed
user.interrupt Stop the agent mid-task
user.define_outcome Define an outcome for the session (Research Preview)

Events the session sends back:

Event type What it means
agent.message Claude's text response
agent.tool_use Claude is invoking a built-in or MCP tool
agent.mcp_tool_use Claude is invoking an MCP tool specifically
agent.custom_tool_use Claude is invoking a custom tool (your code must execute it)
session.status_idle Session has paused; includes stop_reason
session.error An error occurred (e.g., MCP auth failure)

(Session event stream)

The session.status_idle Event

This is the most important event to handle correctly. It signals that the session has stopped running. The stop_reason field tells you why:

  • end_turn — The agent finished its work and is waiting for your next message
  • requires_action — The session is paused waiting for tool confirmation or custom tool result. The blocking event IDs are in stop_reason.requires_action.event_ids.

When you see end_turn, the work is done — read the output, evaluate it, decide whether to send another task or close the session.

When you see requires_action, your code needs to respond before the agent can continue.

The Core Streaming Pattern

The fundamental pattern for interacting with a session:

with client.beta.sessions.events.stream(session.id) as stream:
    # Send the user message after the stream opens
    client.beta.sessions.events.send(
        session.id,
        events=[
            {
                "type": "user.message",
                "content": [
                    {
                        "type": "text",
                        "text": "Create a Python script that generates the first 20 Fibonacci numbers and saves them to fibonacci.txt",
                    },
                ],
            },
        ],
    )

    # Process streaming events
    for event in stream:
        match event.type:
            case "agent.message":
                for block in event.content:
                    print(block.text, end="")
            case "agent.tool_use":
                print(f"\n[Using tool: {event.name}]")
            case "session.status_idle":
                print("\n\nAgent finished.")
                break

(Quickstart)

Critical order: Open the stream first (with client.beta.sessions.events.stream(session.id) as stream:), then send. "The with block opens the SSE connection; anything you send inside the block is guaranteed to be observable. Sending before opening risks losing events that fire in the race window." (Cookbook)

Handling Custom Tool Calls

When a custom tool is invoked, the session pauses with requires_action. Your code must execute the tool and send back the result:

with client.beta.sessions.events.stream(session.id) as stream:
    for event in stream:
        if event.type == "session.status_idle" and (stop := event.stop_reason):
            match stop.type:
                case "requires_action":
                    for event_id in stop.event_ids:
                        # Look up the custom tool use event and execute it
                        tool_event = events_by_id[event_id]
                        result = call_tool(tool_event.name, tool_event.input)

                        # Send the result back
                        client.beta.sessions.events.send(
                            session.id,
                            events=[
                                {
                                    "type": "user.custom_tool_result",
                                    "custom_tool_use_id": event_id,
                                    "content": [{"type": "text", "text": result}],
                                },
                            ],
                        )
                case "end_turn":
                    break

(Session event stream)

The pattern here: when you get requires_action, loop through the event IDs in stop.event_ids. For each, look up the corresponding event, execute the tool in your code, and send back a user.custom_tool_result.

Handling Tool Confirmation Requests

When a tool with always_ask policy fires, the same requires_action stop reason appears:

with client.beta.sessions.events.stream(session.id) as stream:
    for event in stream:
        if event.type == "session.status_idle" and (stop := event.stop_reason):
            match stop.type:
                case "requires_action":
                    for event_id in stop.event_ids:
                        # Approve the pending tool call
                        client.beta.sessions.events.send(
                            session.id,
                            events=[
                                {
                                    "type": "user.tool_confirmation",
                                    "tool_use_id": event_id,
                                    "result": "allow",
                                },
                            ],
                        )
                case "end_turn":
                    break

(Session event stream)

The difference between custom tool handling and permission confirmation:

  • Custom tool: you receive agent.custom_tool_use, execute the tool yourself, send user.custom_tool_result
  • Permission confirmation: you receive agent.tool_use, decide allow/deny, send user.tool_confirmation

The Timestamp: processed_at

Every event includes a processed_at timestamp indicating when the event was recorded server-side. If processed_at is null, the event has been queued by the harness and will be handled after preceding events finish processing. (Session event stream)

This matters for debugging: events with null processed_at are queued but not yet executed. Events with timestamps are part of the committed log.

Polling as an Alternative to Streaming

Streaming (SSE) is the primary pattern for watching events in real time. But for long-running background tasks where you don't want to hold a connection open, polling is a valid alternative:

"Polling wins in the opposite situation. It's stateless, survives process restarts, and composes cleanly with webhook handlers and queue workers that don't want to hold connections open." (Cookbook)

With polling, you periodically call events.list to fetch all events since the last check, rather than streaming them as they arrive. This works well for:

  • Webhook-based architectures where you're processing events in a queue worker
  • Batch jobs that run overnight and check status in the morning
  • Mobile apps that can't hold long-lived connections

Memory Tool Events

When memory stores are attached to a session, the agent's interactions with memory (reading and writing to memory stores) appear as agent.tool_use events in the stream. (Memory) You'll see events like agent.tool_use with name: "memory_read" or name: "memory_write" flowing through the same stream as other tool calls.

DiagramFull event stream diagram for a research task. Sequence from top to bottom: user.message → agent.message ("Let me search for that...") → agent.tool_use [web_search] → agent.tool_use [web_fetch] → agent.tool_use [write] → agent.message ("Here's what I found...") → session.status_idle (end_turn). Each event is a distinct color. The tool_use events have small annotation showing what the tool name and input fields contain.

Try it yourself

Try It Yourself

  1. Build a logging event handler. Modify the core streaming pattern to log all event types and timestamps:

    with client.beta.sessions.events.stream(session.id) as stream:
        client.beta.sessions.events.send(
            session.id,
            events=[{"type": "user.message", "content": [{"type": "text", "text": "Search the web for recent AI news and summarize three headlines."}]}],
        )
    
        for event in stream:
            print(f"[{event.type}] processed_at={getattr(event, 'processed_at', 'null')}")
            if event.type == "agent.message":
                for block in event.content:
                    print(f"  TEXT: {block.text[:100]}...")
            elif event.type == "agent.tool_use":
                print(f"  TOOL: {event.name}")
            elif event.type == "session.status_idle":
                print(f"  STOP: {event.stop_reason}")
                break
    
  2. Count the events. How many agent.tool_use events fired? How many were web_search vs. web_fetch? Did an agent.message appear before the tools, after, or both?

  3. Add a tool name filter. Modify the handler so it only prints agent.tool_use events where event.name == "web_search". This simulates building a monitoring system that only cares about specific tool usage.

  4. Build a simple requires_action handler. Create a session with bash on always_ask, give it a task that needs bash, and implement the confirmation flow:

    for event in stream:
        if event.type == "session.status_idle":
            stop = event.stop_reason
            if stop and stop.type == "requires_action":
                for event_id in stop.event_ids:
                    print(f"Approving event: {event_id}")
                    client.beta.sessions.events.send(
                        session.id,
                        events=[{"type": "user.tool_confirmation", "tool_use_id": event_id, "result": "allow"}],
                    )
            elif stop and stop.type == "end_turn":
                break
    
  5. Test polling vs. streaming. After a session finishes, fetch its event history without the stream:

    for event in client.beta.sessions.events.list(session.id):
        print(f"{event.type}: {event.processed_at}")
    

    Compare the complete history to what you observed in real time.

DiagramCode annotation diagram. The core streaming pattern from the Quickstart with callout boxes at each key line: (1) "Open stream FIRST — then send." (2) "The `with` block guarantees you won't miss events." (3) "match event.type — routes each event type to a handler." (4) "break on session.status_idle — this is your exit signal." Arrows connecting each annotation to the relevant line.

Common pitfalls

Common Pitfalls

  • Sending before streaming. The most common mistake: calling events.send() before events.stream() is open. If the agent starts responding in the gap between send and stream, you miss those events. Always open the stream inside the with block before sending.

  • Not handling requires_action in your event loop. If session.status_idle fires with stop_reason: requires_action and your event loop just breaks, the session is stuck waiting for your confirmation. You need to handle both end_turn and requires_action stop reasons.

  • Assuming all content blocks are text. The agent.message event content array can contain multiple blocks of different types. Always check block.type before accessing block.text.

  • Ignoring session.error events. If an MCP server fails authentication or a network error occurs, a session.error event fires. If your event loop only handles agent.* and session.status_idle events, you'll miss errors and the session behavior will be confusing.

  • Using streaming for long-running background jobs. Streaming requires holding a connection open. For tasks that run for hours, consider polling instead — it's stateless and survives process restarts.


Toolkit

Toolkit

  • Event Handler Boilerplate — Python — A complete, production-ready event handler with support for: text streaming, tool use logging, requires_action routing (both custom tools and permission confirmations), error handling, and session completion detection.

  • Event Type Quick Reference Card — All event types with their direction (sent vs. received), key fields, and when they fire. Single-page format for desk reference.


Chapter Recap

  • Events are bidirectional structured messages following a {domain}.{action} naming convention. You send user events; the session sends agent and session events back.
  • The session.status_idle event is the critical signal: check stop_reason to determine whether the task is done (end_turn) or waiting for your input (requires_action).
  • Always open the event stream before sending — this ensures you don't miss events that fire immediately after your message is delivered. For long-running background jobs, polling events.list is a stateless alternative to streaming.