A user types a question and clicks send. Without streaming, they stare at a blank screen for 5 to 10 seconds while the model generates the full response. With streaming, the first word appears within 200 milliseconds and text flows in token by token, just like ChatGPT. The total time is identical — but the perceived experience is completely different. Streaming is not a nice-to-have. For any user-facing LLM application, it is mandatory.
This lesson covers the full stack: how streaming works at the protocol level, how to implement it with OpenAI and Anthropic SDKs, how to build a streaming API with FastAPI, how to consume it on the frontend with React, and how to handle the production edge cases — errors mid-stream, client disconnects, backpressure, and reconnection.
Why Streaming Matters
Two metrics define user perception of LLM applications:
Time to First Token (TTFT): How long until the user sees the first word. Without streaming, TTFT equals total generation time (5-15 seconds for long responses). With streaming, TTFT is typically 200-500ms.
Tokens per Second (TPS): How fast text flows in after the first token. Most API providers deliver 30-80 tokens per second, which feels natural — roughly as fast as a human reading speed.
| Metric | Without Streaming | With Streaming |
|---|---|---|
| Time to first word | 5-15 seconds | 200-500ms |
| Perceived wait time | Full generation time | Near-instant |
| User abandonment rate | High (>40% at 10s) | Low (<10%) |
| Cancel possible | No (wait or refresh) | Yes (stop anytime) |
Studies on web performance show that users start abandoning at 3 seconds of perceived inactivity. A 10-second wait for a full LLM response loses nearly half your users. Streaming keeps them engaged because they see progress immediately.
SSE vs WebSockets
Two protocols dominate real-time web communication. For LLM streaming, Server-Sent Events (SSE) is almost always the right choice.
| Feature | SSE | WebSockets |
|---|---|---|
| Direction | Server to client only | Bidirectional |
| Protocol | HTTP | Upgraded HTTP → WS |
| Proxy compatibility | Excellent (plain HTTP) | Mixed (some proxies break WS) |
| Reconnection | Built-in (auto-reconnect) | Manual implementation |
| Binary data | No (text only) | Yes |
| Complexity | Low | Medium-High |
| Browser support | All modern browsers | All modern browsers |
| Use case | LLM streaming, notifications | Chat, gaming, collaboration |
Use SSE when: The client sends a request and the server streams a response. This is the standard LLM interaction pattern. SSE is simpler, works through all proxies and CDNs, and has built-in reconnection.
Use WebSockets when: You need bidirectional streaming — for example, a voice chat application where audio flows both ways simultaneously, or a collaborative editing environment where multiple users stream changes.
Streaming with the OpenAI SDK
The OpenAI SDK supports streaming with a simple stream=True parameter.
Synchronous Streaming
from openai import OpenAI
client = OpenAI()
def stream_openai_sync(prompt: str) -> str:
"""Stream a response from OpenAI, printing tokens as they arrive."""
full_response = ""
stream = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": prompt},
],
stream=True,
)
for chunk in stream:
# Each chunk has a choices array with delta content
delta = chunk.choices[0].delta
if delta.content:
print(delta.content, end="", flush=True)
full_response += delta.content
# Check for stop
if chunk.choices[0].finish_reason == "stop":
break
print() # Newline after streaming completes
return full_response
response = stream_openai_sync("Explain how TCP handshakes work in 3 sentences.")Async Streaming
For production servers, you need async streaming to handle concurrent requests.
import asyncio
from openai import AsyncOpenAI
async_client = AsyncOpenAI()
async def stream_openai_async(prompt: str):
"""Async streaming — for use in FastAPI/async frameworks."""
stream = await async_client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": prompt},
],
stream=True,
)
full_response = ""
async for chunk in stream:
delta = chunk.choices[0].delta
if delta.content:
full_response += delta.content
yield delta.content # Yield each token as it arrives
return full_response
# Usage in an async context
async def main():
async for token in stream_openai_async("What is gradient descent?"):
print(token, end="", flush=True)
print()
asyncio.run(main())Streaming with the Anthropic SDK
Anthropic uses a context manager pattern for streaming, which ensures proper cleanup.
import anthropic
client = anthropic.Anthropic()
def stream_anthropic_sync(prompt: str) -> str:
"""Stream a response from Claude."""
full_response = ""
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}],
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
full_response += text
print()
return full_response
# Async version
async_client = anthropic.AsyncAnthropic()
async def stream_anthropic_async(prompt: str):
"""Async streaming from Claude."""
async with async_client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}],
) as stream:
async for text in stream.text_stream:
yield textThe stream.text_stream iterator yields only the text deltas — no parsing of chunk objects needed. The context manager ensures the HTTP connection is properly closed even if an error occurs mid-stream.
Building a Streaming API with FastAPI
Here is a production-ready FastAPI endpoint that streams LLM responses to clients using Server-Sent Events.
import json
import time
import asyncio
import logging
from typing import AsyncGenerator
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from openai import AsyncOpenAI
logger = logging.getLogger(__name__)
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:3000"],
allow_methods=["POST"],
allow_headers=["*"],
)
openai_client = AsyncOpenAI()
class ChatRequest(BaseModel):
message: str
system_prompt: str = "You are a helpful assistant."
model: str = "gpt-4o"
max_tokens: int = 2048
async def generate_sse_stream(
request: ChatRequest,
client_request: Request,
) -> AsyncGenerator[str, None]:
"""Generate SSE-formatted stream from OpenAI."""
try:
stream = await openai_client.chat.completions.create(
model=request.model,
messages=[
{"role": "system", "content": request.system_prompt},
{"role": "user", "content": request.message},
],
max_tokens=request.max_tokens,
stream=True,
)
token_count = 0
start_time = time.time()
async for chunk in stream:
# Check if client disconnected
if await client_request.is_disconnected():
logger.info("Client disconnected, stopping generation")
break
delta = chunk.choices[0].delta
if delta.content:
token_count += 1
# SSE format: "data: {json}\n\n"
event_data = json.dumps({
"type": "token",
"content": delta.content,
})
yield f"data: {event_data}\n\n"
if chunk.choices[0].finish_reason == "stop":
elapsed = time.time() - start_time
done_data = json.dumps({
"type": "done",
"token_count": token_count,
"elapsed_seconds": round(elapsed, 2),
"tokens_per_second": round(token_count / elapsed, 1)
if elapsed > 0 else 0,
})
yield f"data: {done_data}\n\n"
except Exception as e:
logger.error(f"Stream error: {e}")
error_data = json.dumps({
"type": "error",
"message": str(e),
})
yield f"data: {error_data}\n\n"
@app.post("/api/chat/stream")
async def chat_stream(request: ChatRequest, client_request: Request):
"""SSE endpoint for streaming chat responses."""
return StreamingResponse(
generate_sse_stream(request, client_request),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # Disable nginx buffering
},
)
# Non-streaming fallback
@app.post("/api/chat")
async def chat_non_streaming(request: ChatRequest):
"""Non-streaming endpoint for clients that don't support SSE."""
response = await openai_client.chat.completions.create(
model=request.model,
messages=[
{"role": "system", "content": request.system_prompt},
{"role": "user", "content": request.message},
],
max_tokens=request.max_tokens,
)
return {
"content": response.choices[0].message.content,
"usage": {
"prompt_tokens": response.usage.prompt_tokens,
"completion_tokens": response.usage.completion_tokens,
},
}Key details in this implementation:
text/event-streammedia type tells the browser this is an SSE stream.X-Accel-Buffering: noprevents nginx from buffering the entire response before forwarding. Without this header, nginx collects all SSE events and sends them as one big batch — defeating the purpose of streaming.Cache-Control: no-cacheprevents CDNs and browsers from caching the stream.- Client disconnect detection using
await client_request.is_disconnected()stops generation when the user navigates away or closes the tab, saving API costs. - Structured SSE events with a
typefield so the frontend knows whether it is receiving a token, a completion signal, or an error.
Frontend Integration
Using the Fetch API with ReadableStream
The fetch API with ReadableStream gives you the most control. It works everywhere and handles SSE parsing cleanly.
// streamClient.js
export async function streamChat(message, onToken, onDone, onError) {
const controller = new AbortController();
try {
const response = await fetch("/api/chat/stream", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ message }),
signal: controller.signal,
});
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
// Parse SSE events from buffer
const lines = buffer.split("\n");
buffer = lines.pop(); // Keep incomplete line in buffer
for (const line of lines) {
if (line.startsWith("data: ")) {
const jsonStr = line.slice(6); // Remove "data: " prefix
try {
const event = JSON.parse(jsonStr);
if (event.type === "token") {
onToken(event.content);
} else if (event.type === "done") {
onDone(event);
} else if (event.type === "error") {
onError(new Error(event.message));
}
} catch (e) {
// Skip malformed JSON
console.warn("Failed to parse SSE event:", jsonStr);
}
}
}
}
} catch (error) {
if (error.name === "AbortError") {
console.log("Stream aborted by user");
} else {
onError(error);
}
}
// Return abort function so caller can cancel
return () => controller.abort();
}React Component
// ChatStream.jsx
import { useState, useRef, useCallback } from "react";
import { streamChat } from "./streamClient";
export default function ChatStream() {
const [input, setInput] = useState("");
const [messages, setMessages] = useState([]);
const [isStreaming, setIsStreaming] = useState(false);
const abortRef = useRef(null);
const handleSubmit = useCallback(
async (e) => {
e.preventDefault();
if (!input.trim() || isStreaming) return;
const userMessage = input.trim();
setInput("");
setIsStreaming(true);
// Add user message
setMessages((prev) => [...prev, { role: "user", content: userMessage }]);
// Add empty assistant message (will be filled by streaming)
const assistantIndex = messages.length + 1;
setMessages((prev) => [
...prev,
{ role: "assistant", content: "", streaming: true },
]);
// Start streaming
const abort = await streamChat(
userMessage,
// onToken — append each token to the assistant message
(token) => {
setMessages((prev) => {
const updated = [...prev];
const last = updated[updated.length - 1];
updated[updated.length - 1] = {
...last,
content: last.content + token,
};
return updated;
});
},
// onDone
(stats) => {
setMessages((prev) => {
const updated = [...prev];
updated[updated.length - 1] = {
...updated[updated.length - 1],
streaming: false,
stats,
};
return updated;
});
setIsStreaming(false);
},
// onError
(error) => {
console.error("Stream error:", error);
setMessages((prev) => {
const updated = [...prev];
updated[updated.length - 1] = {
...updated[updated.length - 1],
content:
updated[updated.length - 1].content +
"\n\n[Error: Stream interrupted]",
streaming: false,
error: true,
};
return updated;
});
setIsStreaming(false);
}
);
abortRef.current = abort;
},
[input, isStreaming, messages.length]
);
const handleCancel = () => {
if (abortRef.current) {
abortRef.current();
setIsStreaming(false);
}
};
return (
<div style={{ maxWidth: 700, margin: "0 auto", padding: 20 }}>
<div style={{ minHeight: 400, marginBottom: 20 }}>
{messages.map((msg, i) => (
<div
key={i}
style={{
padding: "12px 16px",
marginBottom: 8,
borderRadius: 8,
background: msg.role === "user" ? "#e3f2fd" : "#f5f5f5",
whiteSpace: "pre-wrap",
}}
>
<strong>{msg.role === "user" ? "You" : "Assistant"}</strong>
<div>{msg.content}</div>
{msg.streaming && <span className="cursor-blink">|</span>}
{msg.stats && (
<div style={{ fontSize: 12, color: "#888", marginTop: 4 }}>
{msg.stats.token_count} tokens in {msg.stats.elapsed_seconds}s (
{msg.stats.tokens_per_second} tok/s)
</div>
)}
</div>
))}
</div>
<form onSubmit={handleSubmit} style={{ display: "flex", gap: 8 }}>
<input
type="text"
value={input}
onChange={(e) => setInput(e.target.value)}
placeholder="Type a message..."
disabled={isStreaming}
style={{ flex: 1, padding: "8px 12px", borderRadius: 6 }}
/>
{isStreaming ? (
<button type="button" onClick={handleCancel}>
Stop
</button>
) : (
<button type="submit">Send</button>
)}
</form>
</div>
);
}Using the EventSource API
For simpler cases (GET requests, no custom headers), the native EventSource API is even easier:
function streamWithEventSource(query) {
// Note: EventSource only supports GET requests
const url = `/api/chat/stream?message=${encodeURIComponent(query)}`;
const source = new EventSource(url);
source.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.type === "token") {
document.getElementById("output").textContent += data.content;
} else if (data.type === "done") {
source.close();
}
};
source.onerror = (error) => {
console.error("EventSource error:", error);
source.close();
// EventSource auto-reconnects by default — close explicitly if you don't want that
};
return source; // caller can call source.close() to cancel
}The limitation of EventSource is that it only supports GET requests — you cannot send a JSON body. For LLM applications that need to send conversation history, use fetch with ReadableStream instead.
Error Handling Mid-Stream
Streams can fail in many ways: the API provider has an outage, the network drops, the server runs out of memory, or the model hits a content filter mid-response. You must handle all of these gracefully.
import asyncio
import logging
from enum import Enum
logger = logging.getLogger(__name__)
class StreamErrorType(Enum):
API_ERROR = "api_error"
TIMEOUT = "timeout"
CONTENT_FILTER = "content_filter"
RATE_LIMIT = "rate_limit"
NETWORK = "network_error"
UNKNOWN = "unknown"
async def resilient_stream(
request: ChatRequest,
client_request: Request,
max_retries: int = 2,
timeout_seconds: int = 60,
) -> AsyncGenerator[str, None]:
"""Stream with error recovery and timeout handling."""
retries = 0
accumulated_response = ""
while retries <= max_retries:
try:
stream = await asyncio.wait_for(
openai_client.chat.completions.create(
model=request.model,
messages=[
{"role": "system", "content": request.system_prompt},
{"role": "user", "content": request.message},
],
max_tokens=request.max_tokens,
stream=True,
),
timeout=timeout_seconds,
)
async for chunk in stream:
if await client_request.is_disconnected():
logger.info("Client disconnected")
return
delta = chunk.choices[0].delta
if delta.content:
accumulated_response += delta.content
yield f"data: {json.dumps({'type': 'token', 'content': delta.content})}\n\n"
if chunk.choices[0].finish_reason == "stop":
yield f"data: {json.dumps({'type': 'done'})}\n\n"
return
# Content filter triggered
if chunk.choices[0].finish_reason == "content_filter":
yield f"data: {json.dumps({'type': 'error', 'error_type': 'content_filter', 'message': 'Response filtered by content policy'})}\n\n"
return
except asyncio.TimeoutError:
logger.warning(f"Stream timeout after {timeout_seconds}s")
if accumulated_response:
# Partial response exists — send what we have
yield f"data: {json.dumps({'type': 'error', 'error_type': 'timeout', 'message': 'Generation timed out. Partial response delivered.'})}\n\n"
return
retries += 1
except openai.RateLimitError:
logger.warning("Rate limited, waiting before retry")
yield f"data: {json.dumps({'type': 'status', 'message': 'Rate limited, retrying...'})}\n\n"
await asyncio.sleep(2 ** retries) # Exponential backoff
retries += 1
except openai.APIError as e:
logger.error(f"API error: {e}")
if retries < max_retries:
retries += 1
yield f"data: {json.dumps({'type': 'status', 'message': 'Reconnecting...'})}\n\n"
await asyncio.sleep(1)
else:
yield f"data: {json.dumps({'type': 'error', 'error_type': 'api_error', 'message': f'API error after {max_retries} retries: {str(e)}'})}\n\n"
return
except Exception as e:
logger.error(f"Unexpected error: {e}", exc_info=True)
yield f"data: {json.dumps({'type': 'error', 'error_type': 'unknown', 'message': 'An unexpected error occurred'})}\n\n"
return
yield f"data: {json.dumps({'type': 'error', 'error_type': 'max_retries', 'message': 'Failed after maximum retries'})}\n\n"Backpressure Handling
Backpressure occurs when the server produces tokens faster than the client can consume them. In practice, this is rare for LLM streaming (models generate at 30-80 tokens/second, well within network capacity), but it matters when you are streaming to mobile clients on slow connections or when your server buffers multiple concurrent streams.
import asyncio
from collections import deque
class BackpressureBuffer:
"""Buffer tokens with backpressure to prevent memory overflow."""
def __init__(self, max_buffer_size: int = 1000):
self.buffer: deque[str] = deque(maxlen=max_buffer_size)
self.event = asyncio.Event()
self.done = False
async def put(self, token: str):
"""Add a token to the buffer. Waits if buffer is full."""
while len(self.buffer) >= self.buffer.maxlen:
# Buffer full — wait for consumer to drain
self.event.clear()
await asyncio.wait_for(self.event.wait(), timeout=5.0)
self.buffer.append(token)
self.event.set()
async def get(self) -> str | None:
"""Get the next token. Returns None when stream is done."""
while not self.buffer and not self.done:
self.event.clear()
try:
await asyncio.wait_for(self.event.wait(), timeout=1.0)
except asyncio.TimeoutError:
continue
if self.buffer:
token = self.buffer.popleft()
self.event.set() # Notify producer that space is available
return token
return None
def finish(self):
self.done = True
self.event.set()
async def buffered_stream(request: ChatRequest) -> AsyncGenerator[str, None]:
"""Stream with backpressure buffer between producer and consumer."""
buffer = BackpressureBuffer(max_buffer_size=500)
async def producer():
"""Fetch tokens from API and put them in the buffer."""
stream = await openai_client.chat.completions.create(
model=request.model,
messages=[
{"role": "system", "content": request.system_prompt},
{"role": "user", "content": request.message},
],
stream=True,
)
async for chunk in stream:
delta = chunk.choices[0].delta
if delta.content:
await buffer.put(delta.content)
buffer.finish()
# Start producer in background
producer_task = asyncio.create_task(producer())
# Consumer yields tokens as SSE events
try:
while True:
token = await buffer.get()
if token is None:
break
yield f"data: {json.dumps({'type': 'token', 'content': token})}\n\n"
finally:
producer_task.cancel()Token Buffering: Word-Level Delivery
Raw LLM streaming delivers sub-word tokens. A user might see “Hel” then “lo,” then ” world” — which looks janky. Buffering to word boundaries produces smoother output.
async def word_buffered_stream(
raw_stream: AsyncGenerator[str, None],
) -> AsyncGenerator[str, None]:
"""Buffer tokens and yield at word boundaries for smoother display."""
buffer = ""
async for token in raw_stream:
buffer += token
# Yield complete words (text up to and including the last space)
last_space = buffer.rfind(" ")
if last_space > 0:
yield buffer[: last_space + 1]
buffer = buffer[last_space + 1 :]
# Flush remaining buffer
if buffer:
yield buffer
# Usage in SSE generator
async def smooth_sse_stream(request: ChatRequest) -> AsyncGenerator[str, None]:
stream = await openai_client.chat.completions.create(
model=request.model,
messages=[{"role": "user", "content": request.message}],
stream=True,
)
async def raw_tokens():
async for chunk in stream:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
async for word_group in word_buffered_stream(raw_tokens()):
yield f"data: {json.dumps({'type': 'token', 'content': word_group})}\n\n"
yield f"data: {json.dumps({'type': 'done'})}\n\n"Structured Output and Streaming
When you need JSON output from a streaming LLM, you face a challenge: the JSON is invalid until the last closing brace arrives. Two approaches.
Approach 1: Stream the Full Text, Parse at the End
The simplest option. Show the user raw text as it streams, then parse the complete JSON when done.
async def stream_then_parse(request: ChatRequest) -> AsyncGenerator[str, None]:
"""Stream tokens to client, parse JSON from complete response."""
full_response = ""
stream = await openai_client.chat.completions.create(
model=request.model,
messages=[{"role": "user", "content": request.message}],
stream=True,
response_format={"type": "json_object"},
)
async for chunk in stream:
delta = chunk.choices[0].delta
if delta.content:
full_response += delta.content
yield f"data: {json.dumps({'type': 'token', 'content': delta.content})}\n\n"
# Parse the complete JSON
try:
parsed = json.loads(full_response)
yield f"data: {json.dumps({'type': 'done', 'parsed': parsed})}\n\n"
except json.JSONDecodeError as e:
yield f"data: {json.dumps({'type': 'error', 'message': f'Invalid JSON: {e}'})}\n\n"Approach 2: Incremental JSON Parsing
For long JSON responses, you can parse partial JSON to extract completed fields.
import json
from json import JSONDecodeError
class IncrementalJSONParser:
"""Parse JSON fields as they become complete in a stream."""
def __init__(self):
self.buffer = ""
self.extracted_fields = {}
def feed(self, token: str) -> dict | None:
"""Feed a token and return newly completed fields if any."""
self.buffer += token
# Try to parse completed key-value pairs
# This works for flat JSON objects
try:
# Attempt to parse by adding a closing brace
partial = self.buffer.rstrip().rstrip(",")
if not partial.endswith("}"):
partial += "}"
parsed = json.loads(partial)
# Find new fields
new_fields = {}
for key, value in parsed.items():
if key not in self.extracted_fields:
new_fields[key] = value
self.extracted_fields[key] = value
return new_fields if new_fields else None
except JSONDecodeError:
return None
# Usage
parser = IncrementalJSONParser()
# Simulating streaming tokens
tokens = ['{"', 'name', '": "', 'John', ' Doe', '", "', 'age', '": ', '30', ', "', 'city', '": "', 'NYC', '"}']
for token in tokens:
new_fields = parser.feed(token)
if new_fields:
print(f"New field(s) completed: {new_fields}")
# Output:
# New field(s) completed: {'name': 'John Doe'}
# New field(s) completed: {'age': 30}
# New field(s) completed: {'city': 'NYC'}Cancel/Abort: Letting Users Stop Generation
Users must be able to stop a response mid-stream. The frontend sends an abort signal, the backend detects the disconnection and stops calling the API.
Backend cancel detection (FastAPI):
@app.post("/api/chat/stream")
async def chat_stream(request: ChatRequest, client_request: Request):
async def generate():
stream = await openai_client.chat.completions.create(
model=request.model,
messages=[{"role": "user", "content": request.message}],
stream=True,
)
async for chunk in stream:
# This is the critical check — stop generating if client is gone
if await client_request.is_disconnected():
logger.info("Client cancelled stream")
# Close the API stream to stop billing
await stream.close()
return
delta = chunk.choices[0].delta
if delta.content:
yield f"data: {json.dumps({'type': 'token', 'content': delta.content})}\n\n"
yield f"data: {json.dumps({'type': 'done'})}\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")Frontend abort (already shown in the React component):
// The AbortController pattern
const controller = new AbortController();
fetch("/api/chat/stream", { signal: controller.signal });
// To cancel:
controller.abort(); // Closes the connection, triggers AbortError on frontendWhen the frontend calls controller.abort(), the fetch connection closes. On the server side, client_request.is_disconnected() returns True, and we stop consuming from the OpenAI stream. This is important because OpenAI charges per token generated — if you do not close the stream, tokens continue generating (and billing) even though nobody is reading them.
Reconnection Logic
Network interruptions happen. A solid reconnection strategy maintains the user experience.
// reconnectingStream.js
export class ReconnectingStream {
constructor(url, options = {}) {
this.url = url;
this.options = options;
this.maxRetries = options.maxRetries || 3;
this.retryDelay = options.retryDelay || 1000;
this.onToken = options.onToken || (() => {});
this.onDone = options.onDone || (() => {});
this.onError = options.onError || (() => {});
this.onReconnect = options.onReconnect || (() => {});
this.retries = 0;
this.accumulatedText = "";
this.controller = null;
}
async start(body) {
this.body = body;
this.retries = 0;
await this._connect();
}
async _connect() {
this.controller = new AbortController();
try {
const response = await fetch(this.url, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
...this.body,
// Tell server how much text we already have
resume_from: this.accumulatedText.length,
}),
signal: this.controller.signal,
});
if (!response.ok) throw new Error(`HTTP ${response.status}`);
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop();
for (const line of lines) {
if (line.startsWith("data: ")) {
const event = JSON.parse(line.slice(6));
if (event.type === "token") {
this.accumulatedText += event.content;
this.onToken(event.content);
} else if (event.type === "done") {
this.onDone(event);
return;
} else if (event.type === "error") {
throw new Error(event.message);
}
}
}
}
} catch (error) {
if (error.name === "AbortError") return; // User cancelled
if (this.retries < this.maxRetries) {
this.retries++;
const delay = this.retryDelay * Math.pow(2, this.retries - 1);
this.onReconnect(this.retries, delay);
await new Promise((r) => setTimeout(r, delay));
await this._connect();
} else {
this.onError(error);
}
}
}
cancel() {
if (this.controller) {
this.controller.abort();
}
}
}
// Usage
const stream = new ReconnectingStream("/api/chat/stream", {
maxRetries: 3,
retryDelay: 1000,
onToken: (token) => appendToUI(token),
onDone: (stats) => showComplete(stats),
onError: (err) => showError(err),
onReconnect: (attempt, delay) => showReconnecting(attempt),
});
stream.start({ message: "Explain quantum computing" });
// Cancel button:
document.getElementById("cancel").onclick = () => stream.cancel();The resume_from parameter tells the server how much text the client already received. The server can skip ahead in the response or regenerate with a note to continue from the breakpoint. Full resume is complex — in practice, most applications simply restart the generation and rely on the retry being fast enough that the user does not notice.
Load Testing Streaming Endpoints
Streaming endpoints need different load testing strategies than regular APIs. You need to measure concurrent connection capacity, not just requests per second.
# load_test_streaming.py
import asyncio
import time
import json
import aiohttp
from dataclasses import dataclass
@dataclass
class StreamMetrics:
ttft_ms: float # Time to first token
total_ms: float
token_count: int
success: bool
error: str = ""
async def stream_one_request(
session: aiohttp.ClientSession,
url: str,
payload: dict,
) -> StreamMetrics:
"""Make one streaming request and measure metrics."""
start = time.time()
first_token_time = None
token_count = 0
try:
async with session.post(url, json=payload) as response:
if response.status != 200:
return StreamMetrics(0, 0, 0, False, f"HTTP {response.status}")
async for line in response.content:
decoded = line.decode().strip()
if decoded.startswith("data: "):
event = json.loads(decoded[6:])
if event["type"] == "token":
if first_token_time is None:
first_token_time = time.time()
token_count += 1
elif event["type"] == "done":
break
total = time.time() - start
ttft = (first_token_time - start) * 1000 if first_token_time else 0
return StreamMetrics(ttft, total * 1000, token_count, True)
except Exception as e:
total = time.time() - start
return StreamMetrics(0, total * 1000, token_count, False, str(e))
async def load_test(
url: str,
concurrent_users: int,
requests_per_user: int,
payload: dict,
):
"""Run a streaming load test."""
print(f"Load test: {concurrent_users} concurrent users, "
f"{requests_per_user} requests each")
print(f"URL: {url}")
print("-" * 60)
connector = aiohttp.TCPConnector(limit=concurrent_users)
async with aiohttp.ClientSession(connector=connector) as session:
tasks = []
for _ in range(concurrent_users * requests_per_user):
tasks.append(stream_one_request(session, url, payload))
start = time.time()
results = await asyncio.gather(*tasks)
elapsed = time.time() - start
# Analyze results
successes = [r for r in results if r.success]
failures = [r for r in results if not r.success]
if successes:
ttfts = [r.ttft_ms for r in successes]
totals = [r.total_ms for r in successes]
tokens = [r.token_count for r in successes]
print(f"\nResults ({len(results)} total requests in {elapsed:.1f}s):")
print(f" Successes: {len(successes)}")
print(f" Failures: {len(failures)}")
print(f"\n TTFT (ms): p50={sorted(ttfts)[len(ttfts)//2]:.0f} "
f"p95={sorted(ttfts)[int(len(ttfts)*0.95)]:.0f} "
f"p99={sorted(ttfts)[int(len(ttfts)*0.99)]:.0f}")
print(f" Total (ms): p50={sorted(totals)[len(totals)//2]:.0f} "
f"p95={sorted(totals)[int(len(totals)*0.95)]:.0f}")
print(f" Tokens/req: avg={sum(tokens)/len(tokens):.0f}")
print(f" Throughput: {len(successes)/elapsed:.1f} req/s")
if failures:
errors = {}
for f in failures:
errors[f.error] = errors.get(f.error, 0) + 1
print(f"\n Errors:")
for error, count in errors.items():
print(f" {error}: {count}")
# Run the load test
asyncio.run(load_test(
url="http://localhost:8000/api/chat/stream",
concurrent_users=20,
requests_per_user=5,
payload={"message": "What is 2+2?", "max_tokens": 100},
))Key things to watch in load test results:
- TTFT p95 under 2 seconds — if 5% of users wait more than 2 seconds for the first token, your server is overloaded
- Connection failures — if
aiohttpreports connection errors, your server’s connection limit is too low - Memory growth — monitor server RSS during the test. Each SSE connection holds state in memory. 1000 concurrent connections should not use more than a few hundred MB
Non-Streaming Fallback
Always provide a non-streaming endpoint for clients that cannot handle SSE: mobile apps with limited networking libraries, internal tools that just need the answer, batch processing scripts.
@app.post("/api/chat")
async def chat_non_streaming(request: ChatRequest):
"""Fallback endpoint — returns the complete response."""
try:
response = await openai_client.chat.completions.create(
model=request.model,
messages=[
{"role": "system", "content": request.system_prompt},
{"role": "user", "content": request.message},
],
max_tokens=request.max_tokens,
stream=False,
)
return {
"content": response.choices[0].message.content,
"model": response.model,
"usage": {
"prompt_tokens": response.usage.prompt_tokens,
"completion_tokens": response.usage.completion_tokens,
"total_tokens": response.usage.total_tokens,
},
}
except openai.RateLimitError:
raise HTTPException(status_code=429, detail="Rate limited. Try again later.")
except openai.APIError as e:
raise HTTPException(status_code=502, detail=f"LLM API error: {str(e)}")On the frontend, detect support and fall back automatically:
async function chat(message) {
// Try streaming first
try {
if (typeof ReadableStream !== "undefined") {
return await streamChat(message, onToken, onDone, onError);
}
} catch (e) {
console.warn("Streaming failed, falling back to non-streaming:", e);
}
// Fallback to non-streaming
const response = await fetch("/api/chat", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ message }),
});
const data = await response.json();
onToken(data.content); // Deliver entire response at once
onDone({ token_count: data.usage.completion_tokens });
}Complete End-to-End Example
Here is the minimal complete implementation — a FastAPI backend and React frontend that you can run locally.
Backend (server.py):
import json
import asyncio
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse, HTMLResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from openai import AsyncOpenAI
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
client = AsyncOpenAI()
class ChatRequest(BaseModel):
message: str
model: str = "gpt-4o-mini"
@app.post("/api/stream")
async def stream(request: ChatRequest, raw_request: Request):
async def generate():
try:
stream = await client.chat.completions.create(
model=request.model,
messages=[{"role": "user", "content": request.message}],
stream=True,
)
async for chunk in stream:
if await raw_request.is_disconnected():
break
delta = chunk.choices[0].delta
if delta.content:
yield f"data: {json.dumps({'t': delta.content})}\n\n"
yield f"data: {json.dumps({'done': True})}\n\n"
except Exception as e:
yield f"data: {json.dumps({'error': str(e)})}\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)
# Run: uvicorn server:app --reload --port 8000Frontend (index.html) — vanilla JavaScript, no build step:
<!DOCTYPE html>
<html>
<head>
<title>LLM Stream Demo</title>
<style>
body { font-family: system-ui; max-width: 600px; margin: 40px auto; padding: 0 20px; }
#output { white-space: pre-wrap; background: #f5f5f5; padding: 16px; border-radius: 8px; min-height: 100px; margin: 16px 0; }
input { width: 70%; padding: 8px; }
button { padding: 8px 16px; }
</style>
</head>
<body>
<h2>Streaming LLM Demo</h2>
<form id="form">
<input id="input" placeholder="Ask something..." autofocus />
<button type="submit" id="sendBtn">Send</button>
<button type="button" id="stopBtn" style="display:none">Stop</button>
</form>
<div id="output"></div>
<script>
let controller = null;
const form = document.getElementById('form');
const input = document.getElementById('input');
const output = document.getElementById('output');
const sendBtn = document.getElementById('sendBtn');
const stopBtn = document.getElementById('stopBtn');
form.onsubmit = async (e) => {
e.preventDefault();
const message = input.value.trim();
if (!message) return;
output.textContent = '';
sendBtn.style.display = 'none';
stopBtn.style.display = 'inline';
controller = new AbortController();
try {
const res = await fetch('/api/stream', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({ message }),
signal: controller.signal,
});
const reader = res.body.getReader();
const decoder = new TextDecoder();
let buf = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buf += decoder.decode(value, { stream: true });
const lines = buf.split('\n');
buf = lines.pop();
for (const line of lines) {
if (line.startsWith('data: ')) {
const d = JSON.parse(line.slice(6));
if (d.t) output.textContent += d.t;
if (d.error) output.textContent += '\n[Error: ' + d.error + ']';
}
}
}
} catch (err) {
if (err.name !== 'AbortError') output.textContent += '\n[Error: ' + err.message + ']';
}
sendBtn.style.display = 'inline';
stopBtn.style.display = 'none';
};
stopBtn.onclick = () => { if (controller) controller.abort(); };
</script>
</body>
</html>Run the backend with uvicorn server:app --reload --port 8000 and open index.html in your browser (or serve it with FastAPI’s StaticFiles). You will see tokens stream in real time with a working cancel button.
Key Takeaways
-
Streaming is mandatory for user-facing LLM applications. The difference between 200ms and 10 seconds for first token is the difference between a usable product and an abandoned one.
-
Use Server-Sent Events, not WebSockets. SSE is simpler, works through all proxies, has built-in reconnection, and is the right fit for the request-response pattern of LLM interactions.
-
Always detect client disconnection. Check
is_disconnected()in your stream loop and close the API stream. Otherwise you pay for tokens nobody reads. -
Handle errors mid-stream. The connection can drop at any point. Send structured error events, implement retry logic with exponential backoff, and always have a clear error state in your UI.
-
Set
X-Accel-Buffering: noin your response headers. Without this, nginx and similar reverse proxies buffer the entire stream before forwarding, which defeats the purpose of streaming entirely. -
Buffer tokens to word boundaries for smoother display. Raw sub-word tokens look janky. Accumulate until a space or punctuation and then flush.
-
Provide a non-streaming fallback. Not all clients support SSE. Mobile apps, batch scripts, and some corporate proxies need a simple request-response endpoint.
-
Load test with concurrent connections, not requests per second. Streaming endpoints hold connections open. Your bottleneck is concurrent connection capacity, not throughput.
-
Cancel means cancel. When a user clicks stop, abort the fetch on the frontend and close the API stream on the backend. Do not let tokens continue generating in the background.
-
Keep the complete end-to-end implementation simple. The minimal version is a FastAPI
StreamingResponsewith an async generator, consumed byfetchwithReadableStreamon the frontend. Everything else is polish on top of this core pattern.