LogoA2A Docs

Streaming

Implementing real-time streaming in the A2A protocol

Streaming in A2A

The A2A protocol supports real-time communication through streaming, allowing agents to send incremental updates to clients as they're generated. This is particularly useful for:

  • Providing immediate feedback during long-running operations
  • Displaying progressive generation of content
  • Creating responsive user experiences

Capability Declaration

Agents that support streaming should declare this capability in their agent card:

{
  "name": "your-agent-name",
  "description": "your-agent-description",
  // ... other fields
  "capabilities": {
    "streaming": true,
    "pushNotifications": false,
    "stateTransitionHistory": false
  }
  // ... other fields
}

Implementation Details

Server-Sent Events (SSE)

A2A uses HTTP with Server-Sent Events (SSE) for streaming. SSE is a standard that allows a server to push updates to a client over a single HTTP connection. Unlike WebSockets, SSE is unidirectional (server to client only), making it simpler to implement and well-suited for the agent-to-client update pattern.

Client Implementation

To receive streaming updates, clients send a request using the tasks/sendSubscribe method:

{
  "method":"tasks/sendSubscribe",
  "params": {
    "id": "de38c76d-d54c-436c-8b9f-4c2703648d64",
    "message": {
      "role":"user",
      "parts": [{
        "type":"text",
        "text": "Please generate a detailed report on climate change."
      }]
    },
    "metadata": {}
  }
}

The client then receives updates as they're generated by the agent.

Agent Implementation

Agents respond to streaming requests with a series of events. Each event is a JSON object prefixed with data::

data: {
  "jsonrpc": "2.0",
  "id": 1,
  "result": {
    "id": "task-123",
    "status": {
      "state": "working",
      "timestamp":"2025-04-02T16:59:25.331844"
    },
    "final": false
  }
}

data: {
  "jsonrpc": "2.0",
  "id": 1,
  "result": {
    "id": "task-123",
    "artifact": [
      "parts": [
        {"type":"text", "text": "Climate change is a global..."}
      ],
      "index": 0,
      "append": false,
      "lastChunk": false
    ]
  }
}

// Additional streaming updates...

data: {
  "jsonrpc": "2.0",
  "id": 1,
  "result": {
    "id": "task-123",
    "status": {
      "state": "completed",
      "timestamp":"2025-04-02T16:59:45.331844"
    },
    "final": true
  }
}

Types of Streaming Updates

The A2A protocol supports two main types of streaming updates:

1. Status Updates

Status updates inform the client about the current state of the task:

{
  "id": "task-123",
  "status": {
    "state": "working",
    "timestamp":"2025-04-02T16:59:25.331844"
  },
  "final": false
}

Possible states include:

  • created - Task has been created but not yet processed
  • working - Agent is actively processing the task
  • input-required - Agent needs additional input to continue
  • completed - Task has been successfully completed
  • failed - Task processing has failed

2. Artifact Updates

Artifact updates provide incremental content generated by the agent:

{
  "id": "task-123",
  "artifact": [
    "parts": [
      {"type":"text", "text": "Content chunk..."}
    ],
    "index": 0,
    "append": true,
    "lastChunk": false
  ]
}

Key attributes:

  • append: Whether to append to an existing artifact (true) or create a new one (false)
  • lastChunk: Whether this is the final chunk of the artifact
  • index: The index of the artifact being updated (when multiple artifacts exist)

Handling Disconnections

If a client becomes disconnected during streaming, it can reconnect and resume receiving updates using the tasks/resubscribe method:

{
  "method":"tasks/resubscribe",
  "params": {
    "id": "de38c76d-d54c-436c-8b9f-4c2703648d64",
    "metadata": {}
  }
}

This allows clients to maintain the streaming connection even after temporary disruptions.

Best Practices

When implementing streaming in your A2A applications:

  1. Handle Connection Errors: Implement proper error handling and reconnection logic on the client side.

  2. Process Incremental Updates: Design your client to process and display partial results as they arrive.

  3. Monitor Connection Health: Implement heartbeat mechanisms to detect stale connections.

  4. Support Fallback Methods: Provide non-streaming alternatives for clients that don't support SSE.

  5. Optimize Chunk Size: Balance between small, frequent updates and larger, less frequent ones.

  6. Consider Bandwidth Limitations: Be mindful of clients on limited bandwidth connections.

  7. Implement Backpressure: Ensure your server can handle clients that process updates slowly.

Example: JavaScript Client Implementation

function subscribeToTask(taskId) {
  const eventSource = new EventSource(`/api/tasks/${taskId}/stream`);
  
  eventSource.onmessage = (event) => {
    const data = JSON.parse(event.data);
    
    // Handle status update
    if (data.result.status) {
      updateTaskStatus(data.result.status);
      
      // Close connection when task is complete
      if (data.result.final === true) {
        eventSource.close();
      }
    }
    
    // Handle artifact update
    if (data.result.artifact) {
      updateArtifact(data.result.artifact);
    }
  };
  
  eventSource.onerror = (error) => {
    console.error("EventSource failed:", error);
    eventSource.close();
    
    // Implement reconnection logic here
    setTimeout(() => {
      resubscribeToTask(taskId);
    }, 3000);
  };
  
  return eventSource;
}

Example: Server Implementation (Node.js)

app.get('/api/tasks/:taskId/stream', (req, res) => {
  const taskId = req.params.taskId;
  
  // Set up SSE headers
  res.setHeader('Content-Type', 'text/event-stream');
  res.setHeader('Cache-Control', 'no-cache');
  res.setHeader('Connection', 'keep-alive');
  
  // Send initial status
  const initialStatus = {
    id: taskId,
    status: {
      state: "working",
      timestamp: new Date().toISOString()
    },
    final: false
  };
  
  sendEvent(res, {
    jsonrpc: "2.0",
    id: 1,
    result: initialStatus
  });
  
  // Set up a task processor that will generate incremental updates
  const taskProcessor = createTaskProcessor(taskId);
  
  taskProcessor.on('update', (chunkData) => {
    sendEvent(res, {
      jsonrpc: "2.0",
      id: 1,
      result: {
        id: taskId,
        artifact: chunkData
      }
    });
  });
  
  taskProcessor.on('complete', () => {
    sendEvent(res, {
      jsonrpc: "2.0",
      id: 1,
      result: {
        id: taskId,
        status: {
          state: "completed",
          timestamp: new Date().toISOString()
        },
        final: true
      }
    });
    
    // End the response when the task is complete
    res.end();
  });
  
  // Handle client disconnection
  req.on('close', () => {
    taskProcessor.cleanup();
  });
});
 
function sendEvent(res, data) {
  res.write(`data: ${JSON.stringify(data)}\n\n`);
  res.flush(); // Ensure data is sent immediately
}

A2A Streaming Protocol Compatibility

When implementing streaming in your A2A agent, ensure you follow these protocol requirements:

  1. Endpoint Structure: Implement the /sessions/{sessionId}/stream endpoint for SSE communication.

  2. Status Codes: Use HTTP 200 for successful connection establishment.

  3. JSON-RPC Format: All streaming messages must follow the JSON-RPC 2.0 format with id, jsonrpc, and result fields.

  4. Proper Headers: Include all required SSE headers (Content-Type: text/event-stream, etc.).

  5. Heartbeat Messages: Send periodic keep-alive messages to maintain the connection (comment lines starting with :).

Conclusion

Streaming is a powerful feature of the A2A protocol that enables real-time, incremental updates from agents to clients. By implementing SSE-based streaming, you can create responsive applications that provide immediate feedback to users, especially during long-running operations or content generation tasks.

Whether you're developing agents that generate large amounts of content, perform complex calculations, or manage multi-step workflows, streaming provides a superior user experience compared to traditional request-response patterns.

For more information on the A2A protocol and its implementations, refer to the JSON Specification and other documentation resources.