- Published on
Streaming LLM Responses — Server-Sent Events, Backpressure, and Error Handling
- Authors

- Name
- Sanjeev Sharma
- @webcoderspeed1
Introduction
Streaming LLM responses dramatically improves perceived latency by sending tokens as they arrive. But streaming introduces complexity: managing backpressure when clients can't keep up, handling mid-stream failures, buffering for tool calls, and graceful cancellation. This post covers Server-Sent Events implementation, OpenAI streaming APIs, backpressure handling, error recovery, and production patterns.
- Server-Sent Events in Node.js/Express
- OpenAI Streaming API Integration
- Handling Mid-Stream Errors
- Content Buffering for Tool Calls
- Client Reconnection and Resumption
- TTFB Optimization
- Abort Controller Pattern
- Streaming LLM Implementation Checklist
- Conclusion
Server-Sent Events in Node.js/Express
SSE is HTTP-based streaming for server→client updates. Simpler than WebSockets for one-way server broadcasts.
import { Express, Response, Request } from 'express';
import { EventEmitter } from 'events';
class SSEStream {
private response: Response;
private abortController: AbortController;
private messageQueue: string[] = [];
private isFlushing = false;
private isConnected = true;
constructor(response: Response) {
this.response = response;
this.abortController = new AbortController();
// Set SSE headers
this.response.setHeader('Content-Type', 'text/event-stream');
this.response.setHeader('Cache-Control', 'no-cache');
this.response.setHeader('Connection', 'keep-alive');
this.response.setHeader('X-Accel-Buffering', 'no'); // Disable proxy buffering
// Handle client disconnect
this.response.on('close', () => {
this.isConnected = false;
this.abortController.abort();
});
this.response.on('error', (err) => {
console.error('SSE response error:', err);
this.isConnected = false;
this.abortController.abort();
});
}
async send(data: any, eventType: string = 'message'): Promise<boolean> {
if (!this.isConnected) {
return false;
}
const message = `event: ${eventType}\ndata: ${JSON.stringify(data)}\n\n`;
this.messageQueue.push(message);
// Flush asynchronously to avoid blocking
if (!this.isFlushing) {
this.flush();
}
return true;
}
private async flush(): Promise<void> {
if (this.isFlushing || this.messageQueue.length === 0) {
return;
}
this.isFlushing = true;
try {
while (this.messageQueue.length > 0 && this.isConnected) {
const message = this.messageQueue.shift()!;
// Check if write would block (backpressure)
const canContinue = this.response.write(message);
if (!canContinue) {
// Drain event: wait for writable before continuing
await new Promise<void>((resolve, reject) => {
const timeout = setTimeout(() => {
reject(new Error('SSE write timeout'));
}, 5000);
this.response.once('drain', () => {
clearTimeout(timeout);
resolve();
});
this.response.once('error', () => {
clearTimeout(timeout);
reject(new Error('SSE response error'));
});
});
}
}
} catch (error) {
console.error('SSE flush error:', error);
this.isConnected = false;
} finally {
this.isFlushing = false;
// Continue flushing if messages arrived during flush
if (this.messageQueue.length > 0 && this.isConnected) {
this.flush();
}
}
}
async close(reason: string = 'Stream closed'): Promise<void> {
if (this.isConnected) {
await this.send({ type: 'done', reason }, 'close');
this.response.end();
}
this.isConnected = false;
}
getAbortSignal(): AbortSignal {
return this.abortController.signal;
}
isAlive(): boolean {
return this.isConnected;
}
}
// Express route handler
app.post('/api/stream', async (req: Request, res: Response) => {
const stream = new SSEStream(res);
const { prompt, model } = req.body;
try {
// Start streaming LLM response
const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });
const completion = await openai.chat.completions.create({
model,
messages: [{ role: 'user', content: prompt }],
stream: true,
signal: stream.getAbortSignal(), // Cancel if client disconnects
});
let tokenCount = 0;
for await (const chunk of completion) {
if (!stream.isAlive()) {
break;
}
const content = chunk.choices[0]?.delta?.content;
if (content) {
tokenCount++;
// Send token with metadata
const sent = await stream.send({
type: 'token',
content,
tokenCount,
timestamp: Date.now(),
});
if (!sent) {
console.warn('Failed to send token to client');
break;
}
}
}
await stream.close(`Stream completed with ${tokenCount} tokens`);
} catch (error) {
console.error('Stream error:', error);
if (stream.isAlive()) {
await stream.send(
{
type: 'error',
message: error instanceof Error ? error.message : 'Unknown error',
},
'error'
);
await stream.close('Stream closed due to error');
}
}
});
OpenAI Streaming API Integration
Handle OpenAI's streaming response format with error recovery.
interface StreamConfig {
model: string;
temperature: number;
maxTokens: number;
topP: number;
frequencyPenalty?: number;
presencePenalty?: number;
}
class OpenAIStreamer {
private openai: any;
constructor(apiKey: string) {
this.openai = new OpenAI({ apiKey });
}
async *streamCompletion(
messages: Array<{ role: string; content: string }>,
config: StreamConfig,
signal?: AbortSignal
): AsyncGenerator<{ type: string; content?: string; error?: string; metadata?: any }> {
try {
const stream = await this.openai.chat.completions.create({
model: config.model,
messages,
temperature: config.temperature,
max_tokens: config.maxTokens,
top_p: config.topP,
stream: true,
signal,
});
let tokenCount = 0;
let buffer = '';
for await (const chunk of stream) {
// Handle finish reason (model completed response)
if (chunk.choices[0]?.finish_reason) {
yield {
type: 'finish',
metadata: {
reason: chunk.choices[0].finish_reason,
tokenCount,
},
};
return;
}
const content = chunk.choices[0]?.delta?.content;
if (content) {
tokenCount++;
buffer += content;
yield {
type: 'token',
content,
metadata: { tokenCount },
};
}
}
} catch (error) {
if (error instanceof OpenAI.APIError) {
const message = `OpenAI API Error ${error.status}: ${error.message}`;
yield { type: 'error', error: message };
} else if (error instanceof Error && error.message === 'This operation has been cancelled') {
yield { type: 'cancelled', error: 'Stream cancelled by client' };
} else {
yield { type: 'error', error: error instanceof Error ? error.message : 'Unknown error' };
}
}
}
}
Handling Mid-Stream Errors
Errors mid-stream require special handling: send error token, offer retry option.
interface StreamContext {
requestId: string;
startTime: number;
tokens: string[];
lastSuccessfulToken: number;
retryCount: number;
}
class MidStreamErrorHandler {
async handleStreamError(
error: Error,
context: StreamContext,
recoveryFn?: () => Promise<AsyncGenerator<any>>
): Promise<{ recovered: boolean; recovery?: AsyncGenerator<any> }> {
console.error(`Stream error at token ${context.lastSuccessfulToken}:`, error.message);
// Categorize error
const isRetryable = this.isRetryableError(error);
const canRecover = isRetryable && context.retryCount < 3 && recoveryFn !== undefined;
if (canRecover) {
console.log(`Attempting recovery (attempt ${context.retryCount + 1}/3)...`);
context.retryCount++;
try {
const recovered = await recoveryFn!();
return { recovered: true, recovery: recovered };
} catch (recoveryError) {
console.error('Recovery failed:', recoveryError);
return { recovered: false };
}
}
return { recovered: false };
}
private isRetryableError(error: Error): boolean {
const retryableMessages = [
'temporarily unavailable',
'timeout',
'connection reset',
'429', // Rate limit
'500', // Server error
'502', // Bad gateway
'503', // Service unavailable
];
return retryableMessages.some(msg => error.message.toLowerCase().includes(msg.toLowerCase()));
}
}
Content Buffering for Tool Calls
Some LLM responses include tool calls. Buffer content until tool execution completes.
interface BufferedStreamChunk {
type: 'text' | 'tool_call' | 'tool_result';
content: string;
toolName?: string;
toolInput?: Record<string, any>;
toolCallId?: string;
}
class BufferedStreamProcessor {
private buffer: BufferedStreamChunk[] = [];
private currentToolCall: { id: string; name: string; input: string } | null = null;
processChunk(chunk: any): BufferedStreamChunk | null {
// If this is a tool call start, buffer it
if (chunk.type === 'tool_call_start') {
this.currentToolCall = {
id: chunk.toolCallId,
name: chunk.toolName,
input: '',
};
return null; // Don't emit yet
}
// If we're building a tool call, buffer the input
if (this.currentToolCall && chunk.type === 'tool_call_input') {
this.currentToolCall.input += chunk.content;
return null; // Don't emit yet
}
// Tool call complete: emit it
if (chunk.type === 'tool_call_end' && this.currentToolCall) {
const buffered: BufferedStreamChunk = {
type: 'tool_call',
content: this.currentToolCall.input,
toolName: this.currentToolCall.name,
toolCallId: this.currentToolCall.id,
toolInput: JSON.parse(this.currentToolCall.input),
};
this.currentToolCall = null;
return buffered;
}
// Regular text: emit immediately
if (chunk.type === 'text') {
return {
type: 'text',
content: chunk.content,
};
}
return null;
}
flush(): BufferedStreamChunk[] {
const remaining = this.buffer.splice(0);
if (this.currentToolCall) {
// Incomplete tool call at end of stream
remaining.push({
type: 'tool_call',
content: this.currentToolCall.input,
toolName: this.currentToolCall.name,
toolCallId: this.currentToolCall.id,
});
this.currentToolCall = null;
}
return remaining;
}
}
Client Reconnection and Resumption
Handle client reconnects by tracking progress server-side.
interface StreamSession {
sessionId: string;
userId: string;
createdAt: number;
lastActivityAt: number;
tokensGenerated: string[];
status: 'active' | 'paused' | 'completed' | 'failed';
error?: string;
}
class StreamSessionManager {
private sessions: Map<string, StreamSession> = new Map();
private sessionTimeout = 600000; // 10 minutes
createSession(userId: string): string {
const sessionId = `stream_${userId}_${Date.now()}`;
this.sessions.set(sessionId, {
sessionId,
userId,
createdAt: Date.now(),
lastActivityAt: Date.now(),
tokensGenerated: [],
status: 'active',
});
return sessionId;
}
resumeStream(sessionId: string): { resumed: boolean; session?: StreamSession } {
const session = this.sessions.get(sessionId);
if (!session) {
return { resumed: false };
}
// Check if session expired
if (Date.now() - session.createdAt > this.sessionTimeout) {
this.sessions.delete(sessionId);
return { resumed: false };
}
// Resume
session.lastActivityAt = Date.now();
session.status = 'active';
return { resumed: true, session };
}
recordToken(sessionId: string, token: string): boolean {
const session = this.sessions.get(sessionId);
if (!session || session.status !== 'active') {
return false;
}
session.tokensGenerated.push(token);
session.lastActivityAt = Date.now();
return true;
}
completeStream(sessionId: string): void {
const session = this.sessions.get(sessionId);
if (session) {
session.status = 'completed';
session.lastActivityAt = Date.now();
// Delete after 1 hour
setTimeout(() => this.sessions.delete(sessionId), 3600000);
}
}
failStream(sessionId: string, error: string): void {
const session = this.sessions.get(sessionId);
if (session) {
session.status = 'failed';
session.error = error;
session.lastActivityAt = Date.now();
}
}
}
// Client-side reconnect logic
async function streamWithResumeCapability(
prompt: string,
sessionId?: string
): Promise<string> {
const response = await fetch('/api/stream', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ prompt, sessionId }),
});
const reader = response.body?.getReader();
const decoder = new TextDecoder();
let buffer = '';
try {
while (reader) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
for (let i = 0; i < lines.length - 1; i++) {
const line = lines[i];
if (line.startsWith('data: ')) {
const data = JSON.parse(line.slice(6));
if (data.type === 'error') {
throw new Error(data.message);
} else if (data.type === 'token') {
// Update UI with token
console.log(data.content);
}
}
}
buffer = lines[lines.length - 1];
}
} catch (error) {
console.error('Stream error:', error);
// Attempt reconnect with sessionId
if (sessionId) {
console.log('Reconnecting to stream...');
return streamWithResumeCapability(prompt, sessionId);
}
throw error;
}
return 'Stream completed';
}
TTFB Optimization
Time To First Byte is critical for perceived performance. Minimize latency to first token.
interface TTFBMetrics {
requestTime: number;
dnsTime: number;
tcpConnectTime: number;
tlsTime: number;
firstTokenTime: number;
p50TTFB: number;
p95TTFB: number;
p99TTFB: number;
}
class TTFBOptimizer {
private measurements: number[] = [];
async optimizeTTFB(
prompt: string,
model: string
): Promise<{
firstToken: string;
ttfbMs: number;
}> {
const requestStart = performance.now();
// 1. Parallelize input processing
const [normalizedPrompt, embeddingTask] = await Promise.all([
this.normalizePrompt(prompt),
this.preComputeEmbedding(prompt), // Optional: use for re-ranking
]);
// 2. Stream response immediately, don't wait for full computation
let firstTokenReceived = false;
let firstToken = '';
const streamStart = performance.now();
const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });
const stream = await openai.chat.completions.create({
model,
messages: [{ role: 'user', content: normalizedPrompt }],
stream: true,
});
for await (const chunk of stream) {
if (!firstTokenReceived && chunk.choices[0]?.delta?.content) {
firstTokenReceived = true;
firstToken = chunk.choices[0].delta.content;
const ttfbMs = performance.now() - streamStart;
this.measurements.push(ttfbMs);
return {
firstToken,
ttfbMs,
};
}
}
throw new Error('No tokens received from LLM');
}
private async normalizePrompt(prompt: string): Promise<string> {
// Lightweight preprocessing
return prompt.trim();
}
private async preComputeEmbedding(prompt: string): Promise<number[]> {
// Optional: pre-compute embedding for re-ranking candidates
return [];
}
getTTFBStats(): { p50: number; p95: number; p99: number } {
if (this.measurements.length === 0) {
return { p50: 0, p95: 0, p99: 0 };
}
const sorted = [...this.measurements].sort((a, b) => a - b);
return {
p50: sorted[Math.floor(sorted.length * 0.5)],
p95: sorted[Math.floor(sorted.length * 0.95)],
p99: sorted[Math.floor(sorted.length * 0.99)],
};
}
}
Abort Controller Pattern
Graceful cancellation when client disconnects or timeout occurs.
class StreamAbortManager {
private abortControllers: Map<string, AbortController> = new Map();
private timeouts: Map<string, NodeJS.Timeout> = new Map();
createStream(streamId: string, timeoutMs: number = 300000): AbortController {
const controller = new AbortController();
this.abortControllers.set(streamId, controller);
// Auto-abort after timeout
const timeout = setTimeout(() => {
console.log(`Stream ${streamId} timeout after ${timeoutMs}ms`);
controller.abort();
}, timeoutMs);
this.timeouts.set(streamId, timeout);
return controller;
}
abort(streamId: string, reason: string = 'Manual abort'): void {
const controller = this.abortControllers.get(streamId);
if (controller) {
console.log(`Aborting stream ${streamId}: ${reason}`);
controller.abort();
}
this.cleanup(streamId);
}
cleanup(streamId: string): void {
this.abortControllers.delete(streamId);
const timeout = this.timeouts.get(streamId);
if (timeout) {
clearTimeout(timeout);
this.timeouts.delete(streamId);
}
}
}
// Usage
const abortManager = new StreamAbortManager();
app.post('/api/stream', async (req: Request, res: Response) => {
const streamId = `stream_${Date.now()}`;
const controller = abortManager.createStream(streamId, 300000); // 5 min timeout
try {
const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });
const stream = await openai.chat.completions.create({
model: 'gpt-4-turbo-preview',
messages: req.body.messages,
stream: true,
signal: controller.signal,
});
for await (const chunk of stream) {
res.write(`data: ${JSON.stringify(chunk)}\n\n`);
}
res.end();
} catch (error) {
if (error instanceof Error && error.name === 'AbortError') {
res.write(`data: ${JSON.stringify({ type: 'abort' })}\n\n`);
} else {
res.write(`data: ${JSON.stringify({ type: 'error', message: error })}\n\n`);
}
res.end();
} finally {
abortManager.cleanup(streamId);
}
});
Streaming LLM Implementation Checklist
- Implement SSE with proper headers and connection handling
- Handle client disconnect (close event) to stop streaming
- Implement backpressure: check
write()return value - Use drain event when write returns false
- Stream directly from OpenAI API without buffering
- Handle mid-stream errors with recovery logic
- Buffer tool calls until execution completes
- Implement session tracking for reconnections
- Minimize TTFB with parallel preprocessing
- Use AbortController for graceful cancellation
- Set and enforce stream timeouts
- Monitor TTFB percentiles (p50/p95/p99)
Conclusion
Streaming transforms LLM UX by showing progress instantly. Production implementations require handling backpressure, buffering for tool calls, recovering from mid-stream errors, supporting reconnection, and minimizing TTFB. Master these patterns and your LLM API will feel as responsive as a local model.