arrow_backBACK TO LLM ENGINEERING IN PRODUCTION
Lesson 09LLM Engineering in Production20 min read

Streaming Responses in Production

April 01, 2026

TL;DR

Streaming reduces perceived latency from 5-10 seconds to milliseconds for first token. Use Server-Sent Events (SSE) for most cases — simpler than WebSockets, works through proxies. Backend: FastAPI with StreamingResponse + async generators. Frontend: EventSource API or fetch with ReadableStream. Handle errors mid-stream (the connection will drop), implement reconnection logic, and add a cancel button. Always have a non-streaming fallback.

A user types a question and clicks send. Without streaming, they stare at a blank screen for 5 to 10 seconds while the model generates the full response. With streaming, the first word appears within 200 milliseconds and text flows in token by token, just like ChatGPT. The total time is identical — but the perceived experience is completely different. Streaming is not a nice-to-have. For any user-facing LLM application, it is mandatory.

This lesson covers the full stack: how streaming works at the protocol level, how to implement it with OpenAI and Anthropic SDKs, how to build a streaming API with FastAPI, how to consume it on the frontend with React, and how to handle the production edge cases — errors mid-stream, client disconnects, backpressure, and reconnection.

Why Streaming Matters

Two metrics define user perception of LLM applications:

Time to First Token (TTFT): How long until the user sees the first word. Without streaming, TTFT equals total generation time (5-15 seconds for long responses). With streaming, TTFT is typically 200-500ms.

Tokens per Second (TPS): How fast text flows in after the first token. Most API providers deliver 30-80 tokens per second, which feels natural — roughly as fast as a human reading speed.

Metric Without Streaming With Streaming
Time to first word 5-15 seconds 200-500ms
Perceived wait time Full generation time Near-instant
User abandonment rate High (>40% at 10s) Low (<10%)
Cancel possible No (wait or refresh) Yes (stop anytime)

Studies on web performance show that users start abandoning at 3 seconds of perceived inactivity. A 10-second wait for a full LLM response loses nearly half your users. Streaming keeps them engaged because they see progress immediately.

SSE vs WebSockets

Two protocols dominate real-time web communication. For LLM streaming, Server-Sent Events (SSE) is almost always the right choice.

Feature SSE WebSockets
Direction Server to client only Bidirectional
Protocol HTTP Upgraded HTTP → WS
Proxy compatibility Excellent (plain HTTP) Mixed (some proxies break WS)
Reconnection Built-in (auto-reconnect) Manual implementation
Binary data No (text only) Yes
Complexity Low Medium-High
Browser support All modern browsers All modern browsers
Use case LLM streaming, notifications Chat, gaming, collaboration

Use SSE when: The client sends a request and the server streams a response. This is the standard LLM interaction pattern. SSE is simpler, works through all proxies and CDNs, and has built-in reconnection.

Use WebSockets when: You need bidirectional streaming — for example, a voice chat application where audio flows both ways simultaneously, or a collaborative editing environment where multiple users stream changes.

Streaming with the OpenAI SDK

The OpenAI SDK supports streaming with a simple stream=True parameter.

Synchronous Streaming

from openai import OpenAI

client = OpenAI()


def stream_openai_sync(prompt: str) -> str:
    """Stream a response from OpenAI, printing tokens as they arrive."""
    full_response = ""

    stream = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": "You are a helpful assistant."},
            {"role": "user", "content": prompt},
        ],
        stream=True,
    )

    for chunk in stream:
        # Each chunk has a choices array with delta content
        delta = chunk.choices[0].delta
        if delta.content:
            print(delta.content, end="", flush=True)
            full_response += delta.content

        # Check for stop
        if chunk.choices[0].finish_reason == "stop":
            break

    print()  # Newline after streaming completes
    return full_response


response = stream_openai_sync("Explain how TCP handshakes work in 3 sentences.")

Async Streaming

For production servers, you need async streaming to handle concurrent requests.

import asyncio
from openai import AsyncOpenAI

async_client = AsyncOpenAI()


async def stream_openai_async(prompt: str):
    """Async streaming — for use in FastAPI/async frameworks."""
    stream = await async_client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": "You are a helpful assistant."},
            {"role": "user", "content": prompt},
        ],
        stream=True,
    )

    full_response = ""
    async for chunk in stream:
        delta = chunk.choices[0].delta
        if delta.content:
            full_response += delta.content
            yield delta.content  # Yield each token as it arrives

    return full_response


# Usage in an async context
async def main():
    async for token in stream_openai_async("What is gradient descent?"):
        print(token, end="", flush=True)
    print()

asyncio.run(main())

Streaming with the Anthropic SDK

Anthropic uses a context manager pattern for streaming, which ensures proper cleanup.

import anthropic

client = anthropic.Anthropic()


def stream_anthropic_sync(prompt: str) -> str:
    """Stream a response from Claude."""
    full_response = ""

    with client.messages.stream(
        model="claude-sonnet-4-20250514",
        max_tokens=1024,
        messages=[{"role": "user", "content": prompt}],
    ) as stream:
        for text in stream.text_stream:
            print(text, end="", flush=True)
            full_response += text

    print()
    return full_response


# Async version
async_client = anthropic.AsyncAnthropic()


async def stream_anthropic_async(prompt: str):
    """Async streaming from Claude."""
    async with async_client.messages.stream(
        model="claude-sonnet-4-20250514",
        max_tokens=1024,
        messages=[{"role": "user", "content": prompt}],
    ) as stream:
        async for text in stream.text_stream:
            yield text

The stream.text_stream iterator yields only the text deltas — no parsing of chunk objects needed. The context manager ensures the HTTP connection is properly closed even if an error occurs mid-stream.

Building a Streaming API with FastAPI

Here is a production-ready FastAPI endpoint that streams LLM responses to clients using Server-Sent Events.

import json
import time
import asyncio
import logging
from typing import AsyncGenerator

from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from openai import AsyncOpenAI

logger = logging.getLogger(__name__)
app = FastAPI()

app.add_middleware(
    CORSMiddleware,
    allow_origins=["http://localhost:3000"],
    allow_methods=["POST"],
    allow_headers=["*"],
)

openai_client = AsyncOpenAI()


class ChatRequest(BaseModel):
    message: str
    system_prompt: str = "You are a helpful assistant."
    model: str = "gpt-4o"
    max_tokens: int = 2048


async def generate_sse_stream(
    request: ChatRequest,
    client_request: Request,
) -> AsyncGenerator[str, None]:
    """Generate SSE-formatted stream from OpenAI."""
    try:
        stream = await openai_client.chat.completions.create(
            model=request.model,
            messages=[
                {"role": "system", "content": request.system_prompt},
                {"role": "user", "content": request.message},
            ],
            max_tokens=request.max_tokens,
            stream=True,
        )

        token_count = 0
        start_time = time.time()

        async for chunk in stream:
            # Check if client disconnected
            if await client_request.is_disconnected():
                logger.info("Client disconnected, stopping generation")
                break

            delta = chunk.choices[0].delta
            if delta.content:
                token_count += 1
                # SSE format: "data: {json}\n\n"
                event_data = json.dumps({
                    "type": "token",
                    "content": delta.content,
                })
                yield f"data: {event_data}\n\n"

            if chunk.choices[0].finish_reason == "stop":
                elapsed = time.time() - start_time
                done_data = json.dumps({
                    "type": "done",
                    "token_count": token_count,
                    "elapsed_seconds": round(elapsed, 2),
                    "tokens_per_second": round(token_count / elapsed, 1)
                        if elapsed > 0 else 0,
                })
                yield f"data: {done_data}\n\n"

    except Exception as e:
        logger.error(f"Stream error: {e}")
        error_data = json.dumps({
            "type": "error",
            "message": str(e),
        })
        yield f"data: {error_data}\n\n"


@app.post("/api/chat/stream")
async def chat_stream(request: ChatRequest, client_request: Request):
    """SSE endpoint for streaming chat responses."""
    return StreamingResponse(
        generate_sse_stream(request, client_request),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",  # Disable nginx buffering
        },
    )


# Non-streaming fallback
@app.post("/api/chat")
async def chat_non_streaming(request: ChatRequest):
    """Non-streaming endpoint for clients that don't support SSE."""
    response = await openai_client.chat.completions.create(
        model=request.model,
        messages=[
            {"role": "system", "content": request.system_prompt},
            {"role": "user", "content": request.message},
        ],
        max_tokens=request.max_tokens,
    )
    return {
        "content": response.choices[0].message.content,
        "usage": {
            "prompt_tokens": response.usage.prompt_tokens,
            "completion_tokens": response.usage.completion_tokens,
        },
    }

Key details in this implementation:

  • text/event-stream media type tells the browser this is an SSE stream.
  • X-Accel-Buffering: no prevents nginx from buffering the entire response before forwarding. Without this header, nginx collects all SSE events and sends them as one big batch — defeating the purpose of streaming.
  • Cache-Control: no-cache prevents CDNs and browsers from caching the stream.
  • Client disconnect detection using await client_request.is_disconnected() stops generation when the user navigates away or closes the tab, saving API costs.
  • Structured SSE events with a type field so the frontend knows whether it is receiving a token, a completion signal, or an error.

Frontend Integration

Using the Fetch API with ReadableStream

The fetch API with ReadableStream gives you the most control. It works everywhere and handles SSE parsing cleanly.

// streamClient.js

export async function streamChat(message, onToken, onDone, onError) {
  const controller = new AbortController();

  try {
    const response = await fetch("/api/chat/stream", {
      method: "POST",
      headers: { "Content-Type": "application/json" },
      body: JSON.stringify({ message }),
      signal: controller.signal,
    });

    if (!response.ok) {
      throw new Error(`HTTP ${response.status}: ${response.statusText}`);
    }

    const reader = response.body.getReader();
    const decoder = new TextDecoder();
    let buffer = "";

    while (true) {
      const { done, value } = await reader.read();
      if (done) break;

      buffer += decoder.decode(value, { stream: true });

      // Parse SSE events from buffer
      const lines = buffer.split("\n");
      buffer = lines.pop(); // Keep incomplete line in buffer

      for (const line of lines) {
        if (line.startsWith("data: ")) {
          const jsonStr = line.slice(6); // Remove "data: " prefix
          try {
            const event = JSON.parse(jsonStr);

            if (event.type === "token") {
              onToken(event.content);
            } else if (event.type === "done") {
              onDone(event);
            } else if (event.type === "error") {
              onError(new Error(event.message));
            }
          } catch (e) {
            // Skip malformed JSON
            console.warn("Failed to parse SSE event:", jsonStr);
          }
        }
      }
    }
  } catch (error) {
    if (error.name === "AbortError") {
      console.log("Stream aborted by user");
    } else {
      onError(error);
    }
  }

  // Return abort function so caller can cancel
  return () => controller.abort();
}

React Component

// ChatStream.jsx
import { useState, useRef, useCallback } from "react";
import { streamChat } from "./streamClient";

export default function ChatStream() {
  const [input, setInput] = useState("");
  const [messages, setMessages] = useState([]);
  const [isStreaming, setIsStreaming] = useState(false);
  const abortRef = useRef(null);

  const handleSubmit = useCallback(
    async (e) => {
      e.preventDefault();
      if (!input.trim() || isStreaming) return;

      const userMessage = input.trim();
      setInput("");
      setIsStreaming(true);

      // Add user message
      setMessages((prev) => [...prev, { role: "user", content: userMessage }]);

      // Add empty assistant message (will be filled by streaming)
      const assistantIndex = messages.length + 1;
      setMessages((prev) => [
        ...prev,
        { role: "assistant", content: "", streaming: true },
      ]);

      // Start streaming
      const abort = await streamChat(
        userMessage,
        // onToken — append each token to the assistant message
        (token) => {
          setMessages((prev) => {
            const updated = [...prev];
            const last = updated[updated.length - 1];
            updated[updated.length - 1] = {
              ...last,
              content: last.content + token,
            };
            return updated;
          });
        },
        // onDone
        (stats) => {
          setMessages((prev) => {
            const updated = [...prev];
            updated[updated.length - 1] = {
              ...updated[updated.length - 1],
              streaming: false,
              stats,
            };
            return updated;
          });
          setIsStreaming(false);
        },
        // onError
        (error) => {
          console.error("Stream error:", error);
          setMessages((prev) => {
            const updated = [...prev];
            updated[updated.length - 1] = {
              ...updated[updated.length - 1],
              content:
                updated[updated.length - 1].content +
                "\n\n[Error: Stream interrupted]",
              streaming: false,
              error: true,
            };
            return updated;
          });
          setIsStreaming(false);
        }
      );

      abortRef.current = abort;
    },
    [input, isStreaming, messages.length]
  );

  const handleCancel = () => {
    if (abortRef.current) {
      abortRef.current();
      setIsStreaming(false);
    }
  };

  return (
    <div style={{ maxWidth: 700, margin: "0 auto", padding: 20 }}>
      <div style={{ minHeight: 400, marginBottom: 20 }}>
        {messages.map((msg, i) => (
          <div
            key={i}
            style={{
              padding: "12px 16px",
              marginBottom: 8,
              borderRadius: 8,
              background: msg.role === "user" ? "#e3f2fd" : "#f5f5f5",
              whiteSpace: "pre-wrap",
            }}
          >
            <strong>{msg.role === "user" ? "You" : "Assistant"}</strong>
            <div>{msg.content}</div>
            {msg.streaming && <span className="cursor-blink">|</span>}
            {msg.stats && (
              <div style={{ fontSize: 12, color: "#888", marginTop: 4 }}>
                {msg.stats.token_count} tokens in {msg.stats.elapsed_seconds}s (
                {msg.stats.tokens_per_second} tok/s)
              </div>
            )}
          </div>
        ))}
      </div>

      <form onSubmit={handleSubmit} style={{ display: "flex", gap: 8 }}>
        <input
          type="text"
          value={input}
          onChange={(e) => setInput(e.target.value)}
          placeholder="Type a message..."
          disabled={isStreaming}
          style={{ flex: 1, padding: "8px 12px", borderRadius: 6 }}
        />
        {isStreaming ? (
          <button type="button" onClick={handleCancel}>
            Stop
          </button>
        ) : (
          <button type="submit">Send</button>
        )}
      </form>
    </div>
  );
}

Using the EventSource API

For simpler cases (GET requests, no custom headers), the native EventSource API is even easier:

function streamWithEventSource(query) {
  // Note: EventSource only supports GET requests
  const url = `/api/chat/stream?message=${encodeURIComponent(query)}`;
  const source = new EventSource(url);

  source.onmessage = (event) => {
    const data = JSON.parse(event.data);
    if (data.type === "token") {
      document.getElementById("output").textContent += data.content;
    } else if (data.type === "done") {
      source.close();
    }
  };

  source.onerror = (error) => {
    console.error("EventSource error:", error);
    source.close();
    // EventSource auto-reconnects by default — close explicitly if you don't want that
  };

  return source; // caller can call source.close() to cancel
}

The limitation of EventSource is that it only supports GET requests — you cannot send a JSON body. For LLM applications that need to send conversation history, use fetch with ReadableStream instead.

Error Handling Mid-Stream

Streams can fail in many ways: the API provider has an outage, the network drops, the server runs out of memory, or the model hits a content filter mid-response. You must handle all of these gracefully.

import asyncio
import logging
from enum import Enum

logger = logging.getLogger(__name__)


class StreamErrorType(Enum):
    API_ERROR = "api_error"
    TIMEOUT = "timeout"
    CONTENT_FILTER = "content_filter"
    RATE_LIMIT = "rate_limit"
    NETWORK = "network_error"
    UNKNOWN = "unknown"


async def resilient_stream(
    request: ChatRequest,
    client_request: Request,
    max_retries: int = 2,
    timeout_seconds: int = 60,
) -> AsyncGenerator[str, None]:
    """Stream with error recovery and timeout handling."""
    retries = 0
    accumulated_response = ""

    while retries <= max_retries:
        try:
            stream = await asyncio.wait_for(
                openai_client.chat.completions.create(
                    model=request.model,
                    messages=[
                        {"role": "system", "content": request.system_prompt},
                        {"role": "user", "content": request.message},
                    ],
                    max_tokens=request.max_tokens,
                    stream=True,
                ),
                timeout=timeout_seconds,
            )

            async for chunk in stream:
                if await client_request.is_disconnected():
                    logger.info("Client disconnected")
                    return

                delta = chunk.choices[0].delta
                if delta.content:
                    accumulated_response += delta.content
                    yield f"data: {json.dumps({'type': 'token', 'content': delta.content})}\n\n"

                if chunk.choices[0].finish_reason == "stop":
                    yield f"data: {json.dumps({'type': 'done'})}\n\n"
                    return

                # Content filter triggered
                if chunk.choices[0].finish_reason == "content_filter":
                    yield f"data: {json.dumps({'type': 'error', 'error_type': 'content_filter', 'message': 'Response filtered by content policy'})}\n\n"
                    return

        except asyncio.TimeoutError:
            logger.warning(f"Stream timeout after {timeout_seconds}s")
            if accumulated_response:
                # Partial response exists — send what we have
                yield f"data: {json.dumps({'type': 'error', 'error_type': 'timeout', 'message': 'Generation timed out. Partial response delivered.'})}\n\n"
                return
            retries += 1

        except openai.RateLimitError:
            logger.warning("Rate limited, waiting before retry")
            yield f"data: {json.dumps({'type': 'status', 'message': 'Rate limited, retrying...'})}\n\n"
            await asyncio.sleep(2 ** retries)  # Exponential backoff
            retries += 1

        except openai.APIError as e:
            logger.error(f"API error: {e}")
            if retries < max_retries:
                retries += 1
                yield f"data: {json.dumps({'type': 'status', 'message': 'Reconnecting...'})}\n\n"
                await asyncio.sleep(1)
            else:
                yield f"data: {json.dumps({'type': 'error', 'error_type': 'api_error', 'message': f'API error after {max_retries} retries: {str(e)}'})}\n\n"
                return

        except Exception as e:
            logger.error(f"Unexpected error: {e}", exc_info=True)
            yield f"data: {json.dumps({'type': 'error', 'error_type': 'unknown', 'message': 'An unexpected error occurred'})}\n\n"
            return

    yield f"data: {json.dumps({'type': 'error', 'error_type': 'max_retries', 'message': 'Failed after maximum retries'})}\n\n"

Backpressure Handling

Backpressure occurs when the server produces tokens faster than the client can consume them. In practice, this is rare for LLM streaming (models generate at 30-80 tokens/second, well within network capacity), but it matters when you are streaming to mobile clients on slow connections or when your server buffers multiple concurrent streams.

import asyncio
from collections import deque


class BackpressureBuffer:
    """Buffer tokens with backpressure to prevent memory overflow."""

    def __init__(self, max_buffer_size: int = 1000):
        self.buffer: deque[str] = deque(maxlen=max_buffer_size)
        self.event = asyncio.Event()
        self.done = False

    async def put(self, token: str):
        """Add a token to the buffer. Waits if buffer is full."""
        while len(self.buffer) >= self.buffer.maxlen:
            # Buffer full — wait for consumer to drain
            self.event.clear()
            await asyncio.wait_for(self.event.wait(), timeout=5.0)
        self.buffer.append(token)
        self.event.set()

    async def get(self) -> str | None:
        """Get the next token. Returns None when stream is done."""
        while not self.buffer and not self.done:
            self.event.clear()
            try:
                await asyncio.wait_for(self.event.wait(), timeout=1.0)
            except asyncio.TimeoutError:
                continue
        if self.buffer:
            token = self.buffer.popleft()
            self.event.set()  # Notify producer that space is available
            return token
        return None

    def finish(self):
        self.done = True
        self.event.set()


async def buffered_stream(request: ChatRequest) -> AsyncGenerator[str, None]:
    """Stream with backpressure buffer between producer and consumer."""
    buffer = BackpressureBuffer(max_buffer_size=500)

    async def producer():
        """Fetch tokens from API and put them in the buffer."""
        stream = await openai_client.chat.completions.create(
            model=request.model,
            messages=[
                {"role": "system", "content": request.system_prompt},
                {"role": "user", "content": request.message},
            ],
            stream=True,
        )
        async for chunk in stream:
            delta = chunk.choices[0].delta
            if delta.content:
                await buffer.put(delta.content)
        buffer.finish()

    # Start producer in background
    producer_task = asyncio.create_task(producer())

    # Consumer yields tokens as SSE events
    try:
        while True:
            token = await buffer.get()
            if token is None:
                break
            yield f"data: {json.dumps({'type': 'token', 'content': token})}\n\n"
    finally:
        producer_task.cancel()

Token Buffering: Word-Level Delivery

Raw LLM streaming delivers sub-word tokens. A user might see “Hel” then “lo,” then ” world” — which looks janky. Buffering to word boundaries produces smoother output.

async def word_buffered_stream(
    raw_stream: AsyncGenerator[str, None],
) -> AsyncGenerator[str, None]:
    """Buffer tokens and yield at word boundaries for smoother display."""
    buffer = ""

    async for token in raw_stream:
        buffer += token

        # Yield complete words (text up to and including the last space)
        last_space = buffer.rfind(" ")
        if last_space > 0:
            yield buffer[: last_space + 1]
            buffer = buffer[last_space + 1 :]

    # Flush remaining buffer
    if buffer:
        yield buffer


# Usage in SSE generator
async def smooth_sse_stream(request: ChatRequest) -> AsyncGenerator[str, None]:
    stream = await openai_client.chat.completions.create(
        model=request.model,
        messages=[{"role": "user", "content": request.message}],
        stream=True,
    )

    async def raw_tokens():
        async for chunk in stream:
            if chunk.choices[0].delta.content:
                yield chunk.choices[0].delta.content

    async for word_group in word_buffered_stream(raw_tokens()):
        yield f"data: {json.dumps({'type': 'token', 'content': word_group})}\n\n"

    yield f"data: {json.dumps({'type': 'done'})}\n\n"

Structured Output and Streaming

When you need JSON output from a streaming LLM, you face a challenge: the JSON is invalid until the last closing brace arrives. Two approaches.

Approach 1: Stream the Full Text, Parse at the End

The simplest option. Show the user raw text as it streams, then parse the complete JSON when done.

async def stream_then_parse(request: ChatRequest) -> AsyncGenerator[str, None]:
    """Stream tokens to client, parse JSON from complete response."""
    full_response = ""

    stream = await openai_client.chat.completions.create(
        model=request.model,
        messages=[{"role": "user", "content": request.message}],
        stream=True,
        response_format={"type": "json_object"},
    )

    async for chunk in stream:
        delta = chunk.choices[0].delta
        if delta.content:
            full_response += delta.content
            yield f"data: {json.dumps({'type': 'token', 'content': delta.content})}\n\n"

    # Parse the complete JSON
    try:
        parsed = json.loads(full_response)
        yield f"data: {json.dumps({'type': 'done', 'parsed': parsed})}\n\n"
    except json.JSONDecodeError as e:
        yield f"data: {json.dumps({'type': 'error', 'message': f'Invalid JSON: {e}'})}\n\n"

Approach 2: Incremental JSON Parsing

For long JSON responses, you can parse partial JSON to extract completed fields.

import json
from json import JSONDecodeError


class IncrementalJSONParser:
    """Parse JSON fields as they become complete in a stream."""

    def __init__(self):
        self.buffer = ""
        self.extracted_fields = {}

    def feed(self, token: str) -> dict | None:
        """Feed a token and return newly completed fields if any."""
        self.buffer += token

        # Try to parse completed key-value pairs
        # This works for flat JSON objects
        try:
            # Attempt to parse by adding a closing brace
            partial = self.buffer.rstrip().rstrip(",")
            if not partial.endswith("}"):
                partial += "}"
            parsed = json.loads(partial)

            # Find new fields
            new_fields = {}
            for key, value in parsed.items():
                if key not in self.extracted_fields:
                    new_fields[key] = value
                    self.extracted_fields[key] = value

            return new_fields if new_fields else None

        except JSONDecodeError:
            return None


# Usage
parser = IncrementalJSONParser()

# Simulating streaming tokens
tokens = ['{"', 'name', '": "', 'John', ' Doe', '", "', 'age', '": ', '30', ', "', 'city', '": "', 'NYC', '"}']

for token in tokens:
    new_fields = parser.feed(token)
    if new_fields:
        print(f"New field(s) completed: {new_fields}")
# Output:
# New field(s) completed: {'name': 'John Doe'}
# New field(s) completed: {'age': 30}
# New field(s) completed: {'city': 'NYC'}

Cancel/Abort: Letting Users Stop Generation

Users must be able to stop a response mid-stream. The frontend sends an abort signal, the backend detects the disconnection and stops calling the API.

Backend cancel detection (FastAPI):

@app.post("/api/chat/stream")
async def chat_stream(request: ChatRequest, client_request: Request):
    async def generate():
        stream = await openai_client.chat.completions.create(
            model=request.model,
            messages=[{"role": "user", "content": request.message}],
            stream=True,
        )

        async for chunk in stream:
            # This is the critical check — stop generating if client is gone
            if await client_request.is_disconnected():
                logger.info("Client cancelled stream")
                # Close the API stream to stop billing
                await stream.close()
                return

            delta = chunk.choices[0].delta
            if delta.content:
                yield f"data: {json.dumps({'type': 'token', 'content': delta.content})}\n\n"

        yield f"data: {json.dumps({'type': 'done'})}\n\n"

    return StreamingResponse(generate(), media_type="text/event-stream")

Frontend abort (already shown in the React component):

// The AbortController pattern
const controller = new AbortController();
fetch("/api/chat/stream", { signal: controller.signal });

// To cancel:
controller.abort(); // Closes the connection, triggers AbortError on frontend

When the frontend calls controller.abort(), the fetch connection closes. On the server side, client_request.is_disconnected() returns True, and we stop consuming from the OpenAI stream. This is important because OpenAI charges per token generated — if you do not close the stream, tokens continue generating (and billing) even though nobody is reading them.

Reconnection Logic

Network interruptions happen. A solid reconnection strategy maintains the user experience.

// reconnectingStream.js

export class ReconnectingStream {
  constructor(url, options = {}) {
    this.url = url;
    this.options = options;
    this.maxRetries = options.maxRetries || 3;
    this.retryDelay = options.retryDelay || 1000;
    this.onToken = options.onToken || (() => {});
    this.onDone = options.onDone || (() => {});
    this.onError = options.onError || (() => {});
    this.onReconnect = options.onReconnect || (() => {});

    this.retries = 0;
    this.accumulatedText = "";
    this.controller = null;
  }

  async start(body) {
    this.body = body;
    this.retries = 0;
    await this._connect();
  }

  async _connect() {
    this.controller = new AbortController();

    try {
      const response = await fetch(this.url, {
        method: "POST",
        headers: { "Content-Type": "application/json" },
        body: JSON.stringify({
          ...this.body,
          // Tell server how much text we already have
          resume_from: this.accumulatedText.length,
        }),
        signal: this.controller.signal,
      });

      if (!response.ok) throw new Error(`HTTP ${response.status}`);

      const reader = response.body.getReader();
      const decoder = new TextDecoder();
      let buffer = "";

      while (true) {
        const { done, value } = await reader.read();
        if (done) break;

        buffer += decoder.decode(value, { stream: true });
        const lines = buffer.split("\n");
        buffer = lines.pop();

        for (const line of lines) {
          if (line.startsWith("data: ")) {
            const event = JSON.parse(line.slice(6));
            if (event.type === "token") {
              this.accumulatedText += event.content;
              this.onToken(event.content);
            } else if (event.type === "done") {
              this.onDone(event);
              return;
            } else if (event.type === "error") {
              throw new Error(event.message);
            }
          }
        }
      }
    } catch (error) {
      if (error.name === "AbortError") return; // User cancelled

      if (this.retries < this.maxRetries) {
        this.retries++;
        const delay = this.retryDelay * Math.pow(2, this.retries - 1);
        this.onReconnect(this.retries, delay);
        await new Promise((r) => setTimeout(r, delay));
        await this._connect();
      } else {
        this.onError(error);
      }
    }
  }

  cancel() {
    if (this.controller) {
      this.controller.abort();
    }
  }
}

// Usage
const stream = new ReconnectingStream("/api/chat/stream", {
  maxRetries: 3,
  retryDelay: 1000,
  onToken: (token) => appendToUI(token),
  onDone: (stats) => showComplete(stats),
  onError: (err) => showError(err),
  onReconnect: (attempt, delay) => showReconnecting(attempt),
});

stream.start({ message: "Explain quantum computing" });

// Cancel button:
document.getElementById("cancel").onclick = () => stream.cancel();

The resume_from parameter tells the server how much text the client already received. The server can skip ahead in the response or regenerate with a note to continue from the breakpoint. Full resume is complex — in practice, most applications simply restart the generation and rely on the retry being fast enough that the user does not notice.

Load Testing Streaming Endpoints

Streaming endpoints need different load testing strategies than regular APIs. You need to measure concurrent connection capacity, not just requests per second.

# load_test_streaming.py
import asyncio
import time
import json
import aiohttp
from dataclasses import dataclass


@dataclass
class StreamMetrics:
    ttft_ms: float  # Time to first token
    total_ms: float
    token_count: int
    success: bool
    error: str = ""


async def stream_one_request(
    session: aiohttp.ClientSession,
    url: str,
    payload: dict,
) -> StreamMetrics:
    """Make one streaming request and measure metrics."""
    start = time.time()
    first_token_time = None
    token_count = 0

    try:
        async with session.post(url, json=payload) as response:
            if response.status != 200:
                return StreamMetrics(0, 0, 0, False, f"HTTP {response.status}")

            async for line in response.content:
                decoded = line.decode().strip()
                if decoded.startswith("data: "):
                    event = json.loads(decoded[6:])
                    if event["type"] == "token":
                        if first_token_time is None:
                            first_token_time = time.time()
                        token_count += 1
                    elif event["type"] == "done":
                        break

        total = time.time() - start
        ttft = (first_token_time - start) * 1000 if first_token_time else 0
        return StreamMetrics(ttft, total * 1000, token_count, True)

    except Exception as e:
        total = time.time() - start
        return StreamMetrics(0, total * 1000, token_count, False, str(e))


async def load_test(
    url: str,
    concurrent_users: int,
    requests_per_user: int,
    payload: dict,
):
    """Run a streaming load test."""
    print(f"Load test: {concurrent_users} concurrent users, "
          f"{requests_per_user} requests each")
    print(f"URL: {url}")
    print("-" * 60)

    connector = aiohttp.TCPConnector(limit=concurrent_users)
    async with aiohttp.ClientSession(connector=connector) as session:
        tasks = []
        for _ in range(concurrent_users * requests_per_user):
            tasks.append(stream_one_request(session, url, payload))

        start = time.time()
        results = await asyncio.gather(*tasks)
        elapsed = time.time() - start

    # Analyze results
    successes = [r for r in results if r.success]
    failures = [r for r in results if not r.success]

    if successes:
        ttfts = [r.ttft_ms for r in successes]
        totals = [r.total_ms for r in successes]
        tokens = [r.token_count for r in successes]

        print(f"\nResults ({len(results)} total requests in {elapsed:.1f}s):")
        print(f"  Successes: {len(successes)}")
        print(f"  Failures:  {len(failures)}")
        print(f"\n  TTFT (ms):     p50={sorted(ttfts)[len(ttfts)//2]:.0f}  "
              f"p95={sorted(ttfts)[int(len(ttfts)*0.95)]:.0f}  "
              f"p99={sorted(ttfts)[int(len(ttfts)*0.99)]:.0f}")
        print(f"  Total (ms):    p50={sorted(totals)[len(totals)//2]:.0f}  "
              f"p95={sorted(totals)[int(len(totals)*0.95)]:.0f}")
        print(f"  Tokens/req:    avg={sum(tokens)/len(tokens):.0f}")
        print(f"  Throughput:    {len(successes)/elapsed:.1f} req/s")

    if failures:
        errors = {}
        for f in failures:
            errors[f.error] = errors.get(f.error, 0) + 1
        print(f"\n  Errors:")
        for error, count in errors.items():
            print(f"    {error}: {count}")


# Run the load test
asyncio.run(load_test(
    url="http://localhost:8000/api/chat/stream",
    concurrent_users=20,
    requests_per_user=5,
    payload={"message": "What is 2+2?", "max_tokens": 100},
))

Key things to watch in load test results:

  • TTFT p95 under 2 seconds — if 5% of users wait more than 2 seconds for the first token, your server is overloaded
  • Connection failures — if aiohttp reports connection errors, your server’s connection limit is too low
  • Memory growth — monitor server RSS during the test. Each SSE connection holds state in memory. 1000 concurrent connections should not use more than a few hundred MB

Non-Streaming Fallback

Always provide a non-streaming endpoint for clients that cannot handle SSE: mobile apps with limited networking libraries, internal tools that just need the answer, batch processing scripts.

@app.post("/api/chat")
async def chat_non_streaming(request: ChatRequest):
    """Fallback endpoint — returns the complete response."""
    try:
        response = await openai_client.chat.completions.create(
            model=request.model,
            messages=[
                {"role": "system", "content": request.system_prompt},
                {"role": "user", "content": request.message},
            ],
            max_tokens=request.max_tokens,
            stream=False,
        )

        return {
            "content": response.choices[0].message.content,
            "model": response.model,
            "usage": {
                "prompt_tokens": response.usage.prompt_tokens,
                "completion_tokens": response.usage.completion_tokens,
                "total_tokens": response.usage.total_tokens,
            },
        }

    except openai.RateLimitError:
        raise HTTPException(status_code=429, detail="Rate limited. Try again later.")
    except openai.APIError as e:
        raise HTTPException(status_code=502, detail=f"LLM API error: {str(e)}")

On the frontend, detect support and fall back automatically:

async function chat(message) {
  // Try streaming first
  try {
    if (typeof ReadableStream !== "undefined") {
      return await streamChat(message, onToken, onDone, onError);
    }
  } catch (e) {
    console.warn("Streaming failed, falling back to non-streaming:", e);
  }

  // Fallback to non-streaming
  const response = await fetch("/api/chat", {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify({ message }),
  });
  const data = await response.json();
  onToken(data.content); // Deliver entire response at once
  onDone({ token_count: data.usage.completion_tokens });
}

Complete End-to-End Example

Here is the minimal complete implementation — a FastAPI backend and React frontend that you can run locally.

Backend (server.py):

import json
import asyncio
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse, HTMLResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from openai import AsyncOpenAI

app = FastAPI()
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_methods=["*"],
    allow_headers=["*"],
)
client = AsyncOpenAI()


class ChatRequest(BaseModel):
    message: str
    model: str = "gpt-4o-mini"


@app.post("/api/stream")
async def stream(request: ChatRequest, raw_request: Request):
    async def generate():
        try:
            stream = await client.chat.completions.create(
                model=request.model,
                messages=[{"role": "user", "content": request.message}],
                stream=True,
            )
            async for chunk in stream:
                if await raw_request.is_disconnected():
                    break
                delta = chunk.choices[0].delta
                if delta.content:
                    yield f"data: {json.dumps({'t': delta.content})}\n\n"
            yield f"data: {json.dumps({'done': True})}\n\n"
        except Exception as e:
            yield f"data: {json.dumps({'error': str(e)})}\n\n"

    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
    )


# Run: uvicorn server:app --reload --port 8000

Frontend (index.html) — vanilla JavaScript, no build step:

<!DOCTYPE html>
<html>
<head>
  <title>LLM Stream Demo</title>
  <style>
    body { font-family: system-ui; max-width: 600px; margin: 40px auto; padding: 0 20px; }
    #output { white-space: pre-wrap; background: #f5f5f5; padding: 16px; border-radius: 8px; min-height: 100px; margin: 16px 0; }
    input { width: 70%; padding: 8px; }
    button { padding: 8px 16px; }
  </style>
</head>
<body>
  <h2>Streaming LLM Demo</h2>
  <form id="form">
    <input id="input" placeholder="Ask something..." autofocus />
    <button type="submit" id="sendBtn">Send</button>
    <button type="button" id="stopBtn" style="display:none">Stop</button>
  </form>
  <div id="output"></div>

  <script>
    let controller = null;
    const form = document.getElementById('form');
    const input = document.getElementById('input');
    const output = document.getElementById('output');
    const sendBtn = document.getElementById('sendBtn');
    const stopBtn = document.getElementById('stopBtn');

    form.onsubmit = async (e) => {
      e.preventDefault();
      const message = input.value.trim();
      if (!message) return;

      output.textContent = '';
      sendBtn.style.display = 'none';
      stopBtn.style.display = 'inline';
      controller = new AbortController();

      try {
        const res = await fetch('/api/stream', {
          method: 'POST',
          headers: {'Content-Type': 'application/json'},
          body: JSON.stringify({ message }),
          signal: controller.signal,
        });

        const reader = res.body.getReader();
        const decoder = new TextDecoder();
        let buf = '';

        while (true) {
          const { done, value } = await reader.read();
          if (done) break;
          buf += decoder.decode(value, { stream: true });
          const lines = buf.split('\n');
          buf = lines.pop();
          for (const line of lines) {
            if (line.startsWith('data: ')) {
              const d = JSON.parse(line.slice(6));
              if (d.t) output.textContent += d.t;
              if (d.error) output.textContent += '\n[Error: ' + d.error + ']';
            }
          }
        }
      } catch (err) {
        if (err.name !== 'AbortError') output.textContent += '\n[Error: ' + err.message + ']';
      }

      sendBtn.style.display = 'inline';
      stopBtn.style.display = 'none';
    };

    stopBtn.onclick = () => { if (controller) controller.abort(); };
  </script>
</body>
</html>

Run the backend with uvicorn server:app --reload --port 8000 and open index.html in your browser (or serve it with FastAPI’s StaticFiles). You will see tokens stream in real time with a working cancel button.

Key Takeaways

  1. Streaming is mandatory for user-facing LLM applications. The difference between 200ms and 10 seconds for first token is the difference between a usable product and an abandoned one.

  2. Use Server-Sent Events, not WebSockets. SSE is simpler, works through all proxies, has built-in reconnection, and is the right fit for the request-response pattern of LLM interactions.

  3. Always detect client disconnection. Check is_disconnected() in your stream loop and close the API stream. Otherwise you pay for tokens nobody reads.

  4. Handle errors mid-stream. The connection can drop at any point. Send structured error events, implement retry logic with exponential backoff, and always have a clear error state in your UI.

  5. Set X-Accel-Buffering: no in your response headers. Without this, nginx and similar reverse proxies buffer the entire stream before forwarding, which defeats the purpose of streaming entirely.

  6. Buffer tokens to word boundaries for smoother display. Raw sub-word tokens look janky. Accumulate until a space or punctuation and then flush.

  7. Provide a non-streaming fallback. Not all clients support SSE. Mobile apps, batch scripts, and some corporate proxies need a simple request-response endpoint.

  8. Load test with concurrent connections, not requests per second. Streaming endpoints hold connections open. Your bottleneck is concurrent connection capacity, not throughput.

  9. Cancel means cancel. When a user clicks stop, abort the fetch on the frontend and close the API stream on the backend. Do not let tokens continue generating in the background.

  10. Keep the complete end-to-end implementation simple. The minimal version is a FastAPI StreamingResponse with an async generator, consumed by fetch with ReadableStream on the frontend. Everything else is polish on top of this core pattern.