- Published on
LangGraph in Production — Stateful AI Agents With Checkpointing and Human-in-the-Loop
- Authors

- Name
- Sanjeev Sharma
- @webcoderspeed1
Introduction
LangGraph is LangChain''s framework for building stateful AI agents as directed graphs. Unlike simple prompt chains, LangGraph manages complex workflows with conditional routing, state persistence, and human interruption points. This post covers graph design, checkpointing to databases, resuming interrupted workflows, and production deployment.
- Understanding LangGraph: State Machines for AI
- Conditional Routing
- Checkpointing to PostgreSQL
- Human-in-the-Loop Interrupts
- Streaming State Updates
- Production Deployment
- Checklist
- Conclusion
Understanding LangGraph: State Machines for AI
LangGraph models agent workflows as state machines where:
- Nodes are functions that process state
- Edges define transitions (can be conditional)
- State is a schema defining all data in the workflow
import {
Annotation,
StateGraph,
START,
END,
} from '@langchain/langgraph';
import Anthropic from '@anthropic-ai/sdk';
// Define the state schema
const AgentState = Annotation.Root({
messages: Annotation<{ role: 'user' | 'assistant'; content: string }[]>,
research: Annotation<string>,
analysis: Annotation<string>,
final_response: Annotation<string>,
});
const client = new Anthropic({
apiKey: process.env.ANTHROPIC_API_KEY,
});
// Node 1: Research
async function researchNode(state: typeof AgentState.State) {
const messages = state.messages;
const response = await client.messages.create({
model: 'claude-opus-4-1',
max_tokens: 2048,
system: 'You are a research analyst. Gather information on the topic.',
messages: messages.map((m) => ({
role: m.role,
content: m.content,
})) as Parameters<typeof client.messages.create>[0]['messages'],
});
const research =
response.content[0].type === 'text' ? response.content[0].text : '';
return {
...state,
research,
messages: [
...state.messages,
{ role: 'assistant', content: `Research complete: ${research}` },
],
};
}
// Node 2: Analysis
async function analysisNode(state: typeof AgentState.State) {
const response = await client.messages.create({
model: 'claude-opus-4-1',
max_tokens: 1024,
system: 'You are an analyst. Provide insights based on the research.',
messages: [
{
role: 'user',
content: `Analyse this research: ${state.research}`,
},
],
});
const analysis =
response.content[0].type === 'text' ? response.content[0].text : '';
return {
...state,
analysis,
};
}
// Node 3: Response generation
async function responseNode(state: typeof AgentState.State) {
const response = await client.messages.create({
model: 'claude-opus-4-1',
max_tokens: 1024,
system: 'You are a communicator. Write a clear, concise response.',
messages: [
{
role: 'user',
content: `Research: ${state.research}\nAnalysis: ${state.analysis}\nProvide final response.`,
},
],
});
const finalResponse =
response.content[0].type === 'text' ? response.content[0].text : '';
return {
...state,
final_response: finalResponse,
};
}
// Build the graph
const graph = new StateGraph(AgentState)
.addNode('research', researchNode)
.addNode('analysis', analysisNode)
.addNode('response', responseNode)
.addEdge(START, 'research')
.addEdge('research', 'analysis')
.addEdge('analysis', 'response')
.addEdge('response', END);
const agent = graph.compile();
Conditional Routing
Route nodes based on state:
const ConditionalStateGraph = new StateGraph(AgentState)
.addNode('research', researchNode)
.addNode('analysis', analysisNode)
.addNode('fact_check', factCheckNode)
.addNode('response', responseNode)
.addEdge(START, 'research')
.addConditionalEdges(
'research',
// Router function: examines state and returns next node
(state: typeof AgentState.State) => {
// If research is <100 chars, fact-check
if (state.research.length < 100) {
return 'fact_check';
}
// Otherwise, proceed to analysis
return 'analysis';
},
{
fact_check: 'fact_check',
analysis: 'analysis',
}
)
.addEdge('fact_check', 'analysis')
.addEdge('analysis', 'response')
.addEdge('response', END);
const conditionalAgent = ConditionalStateGraph.compile();
async function factCheckNode(state: typeof AgentState.State) {
const response = await client.messages.create({
model: 'claude-opus-4-1',
max_tokens: 512,
system: 'Verify the accuracy of this research.',
messages: [
{
role: 'user',
content: `Fact-check this: ${state.research}`,
},
],
});
return {
...state,
research: (response.content[0].type === 'text' ? response.content[0].text : '') + '\n[Fact-checked]',
};
}
Checkpointing to PostgreSQL
Persist state so workflows can be resumed:
import { Client } from 'pg';
interface CheckpointData {
thread_id: string;
checkpoint_id: string;
node_id: string;
state: string;
timestamp: Date;
created_at: Date;
}
class PostgresCheckpointer {
private db: Client;
constructor(connectionString: string) {
this.db = new Client({ connectionString });
}
async connect(): Promise<void> {
await this.db.connect();
// Create table if not exists
await this.db.query(`
CREATE TABLE IF NOT EXISTS graph_checkpoints (
thread_id TEXT NOT NULL,
checkpoint_id TEXT NOT NULL,
node_id TEXT NOT NULL,
state JSONB NOT NULL,
timestamp TIMESTAMPTZ NOT NULL,
created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (thread_id, checkpoint_id)
);
CREATE INDEX IF NOT EXISTS idx_checkpoints_thread
ON graph_checkpoints(thread_id, timestamp DESC);
`);
}
async saveCheckpoint(
threadId: string,
checkpointId: string,
nodeId: string,
state: typeof AgentState.State
): Promise<void> {
await this.db.query(
`INSERT INTO graph_checkpoints
(thread_id, checkpoint_id, node_id, state, timestamp)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (thread_id, checkpoint_id)
DO UPDATE SET state = $4, timestamp = $5`,
[threadId, checkpointId, nodeId, JSON.stringify(state), new Date()]
);
}
async loadCheckpoint(
threadId: string,
checkpointId?: string
): Promise<CheckpointData | null> {
const query = checkpointId
? `SELECT * FROM graph_checkpoints
WHERE thread_id = $1 AND checkpoint_id = $2`
: `SELECT * FROM graph_checkpoints
WHERE thread_id = $1
ORDER BY timestamp DESC LIMIT 1`;
const params = checkpointId ? [threadId, checkpointId] : [threadId];
const result = await this.db.query(query, params);
if (result.rows.length === 0) {
return null;
}
const row = result.rows[0];
return {
thread_id: row.thread_id,
checkpoint_id: row.checkpoint_id,
node_id: row.node_id,
state: row.state,
timestamp: row.timestamp,
created_at: row.created_at,
};
}
async disconnect(): Promise<void> {
await this.db.end();
}
}
const checkpointer = new PostgresCheckpointer(
process.env.DATABASE_URL || 'postgres://localhost/langgraph'
);
await checkpointer.connect();
// Wrap agent with checkpointing
async function runWithCheckpoints(
threadId: string,
input: { messages: { role: 'user' | 'assistant'; content: string }[] }
) {
let state = input;
// Load previous checkpoint if exists
const checkpoint = await checkpointer.loadCheckpoint(threadId);
if (checkpoint) {
state = JSON.parse(checkpoint.state);
console.log(`Resuming from checkpoint at node: ${checkpoint.node_id}`);
}
// Create a checkpoint before running
const checkpointId = `cp-${Date.now()}`;
// Run the agent
const stream = agent.stream(state, {
threadId,
checkpointId,
});
for await (const event of stream) {
const [nodeId, nodeState] = Object.entries(event)[0] as [
string,
typeof AgentState.State
];
if (nodeState) {
await checkpointer.saveCheckpoint(threadId, checkpointId, nodeId, nodeState);
console.log(`Checkpoint saved at node: ${nodeId}`);
}
}
}
Human-in-the-Loop Interrupts
Pause workflows for human review:
import { Annotation, StateGraph, START, END } from '@langchain/langgraph';
const InterruptibleState = Annotation.Root({
messages: Annotation<{ role: 'user' | 'assistant'; content: string }[]>,
draft_response: Annotation<string>,
approved: Annotation<boolean | null>,
final_response: Annotation<string>,
});
async function generateDraftNode(state: typeof InterruptibleState.State) {
const response = await client.messages.create({
model: 'claude-opus-4-1',
max_tokens: 2048,
messages: state.messages as Parameters<typeof client.messages.create>[0]['messages'],
});
return {
...state,
draft_response:
response.content[0].type === 'text' ? response.content[0].text : '',
};
}
function shouldApproveNode(state: typeof InterruptibleState.State) {
// If approved is null, interrupt workflow
if (state.approved === null) {
return 'interrupt';
}
// If human approved, finalize
if (state.approved === true) {
return 'finalize';
}
// If human rejected, regenerate
return 'regenerate';
}
async function finalizeNode(state: typeof InterruptibleState.State) {
return {
...state,
final_response: state.draft_response,
};
}
async function regenerateNode(state: typeof InterruptibleState.State) {
const response = await client.messages.create({
model: 'claude-opus-4-1',
max_tokens: 2048,
messages: [
...state.messages,
{
role: 'user',
content: 'Human rejected the previous response. Try a different approach.',
},
] as Parameters<typeof client.messages.create>[0]['messages'],
});
return {
...state,
draft_response:
response.content[0].type === 'text' ? response.content[0].text : '',
approved: null, // Reset for re-review
};
}
const interruptibleGraph = new StateGraph(InterruptibleState)
.addNode('draft', generateDraftNode)
.addNode('finalize', finalizeNode)
.addNode('regenerate', regenerateNode)
.addEdge(START, 'draft')
.addConditionalEdges('draft', shouldApproveNode, {
interrupt: END,
finalize: 'finalize',
regenerate: 'regenerate',
})
.addConditionalEdges('regenerate', shouldApproveNode, {
interrupt: END,
finalize: 'finalize',
regenerate: 'regenerate',
})
.addEdge('finalize', END);
const interruptibleAgent = interruptibleGraph.compile({
interruptBefore: ['draft'],
});
// Usage with human approval
async function runWithApproval(threadId: string, input: typeof InterruptibleState.State) {
const stream = interruptibleAgent.stream(input, {
threadId,
});
for await (const event of stream) {
const [nodeId, nodeState] = Object.entries(event)[0] as [
string,
typeof InterruptibleState.State
];
if (nodeId === 'draft') {
console.log('Draft response:');
console.log(nodeState.draft_response);
console.log('\nWaiting for human approval...');
// In real app, get approval from user
// For demo, auto-approve
nodeState.approved = true;
}
}
}
Streaming State Updates
Stream intermediate results:
async function streamingAgent(
input: typeof AgentState.State,
onUpdate: (node: string, state: typeof AgentState.State) => void
) {
const stream = agent.stream(input);
for await (const event of stream) {
const [nodeId, nodeState] = Object.entries(event)[0] as [
string,
typeof AgentState.State
];
if (nodeState) {
onUpdate(nodeId, nodeState);
}
}
}
// Stream to client
async function handleAgentRequest(req: any, res: any) {
const { messages } = req.body;
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
await streamingAgent(
{ messages, research: '', analysis: '', final_response: '' },
(nodeId, state) => {
res.write(
`event: state-update\ndata: ${JSON.stringify({ nodeId, state })}\n\n`
);
}
);
res.end();
}
Production Deployment
Deploy LangGraph as an API service:
import express, { Express } from 'express';
import { v4 as uuidv4 } from 'uuid';
const app: Express = express();
app.use(express.json());
interface ThreadMetadata {
threadId: string;
createdAt: Date;
lastUpdated: Date;
status: 'running' | 'paused' | 'completed';
}
const threads = new Map<string, ThreadMetadata>();
// Start a new workflow
app.post('/workflows', async (req, res) => {
const { input } = req.body;
const threadId = uuidv4();
threads.set(threadId, {
threadId,
createdAt: new Date(),
lastUpdated: new Date(),
status: 'running',
});
res.json({ threadId });
// Run agent asynchronously
setImmediate(() => runWithCheckpoints(threadId, input));
});
// Get workflow status
app.get('/workflows/:threadId', async (req, res) => {
const metadata = threads.get(req.params.threadId);
if (!metadata) {
return res.status(404).json({ error: 'Thread not found' });
}
const checkpoint = await checkpointer.loadCheckpoint(req.params.threadId);
res.json({
threadId: metadata.threadId,
status: metadata.status,
lastCheckpoint: checkpoint
? {
nodeId: checkpoint.node_id,
timestamp: checkpoint.timestamp,
}
: null,
});
});
// Resume workflow after human approval
app.post('/workflows/:threadId/approve', async (req, res) => {
const { approved } = req.body;
const threadId = req.params.threadId;
// Update state in Redis or DB to signal approval
// Re-run agent from last checkpoint
res.json({ threadId, approved });
});
app.listen(3000, () => {
console.log('LangGraph API running on port 3000');
});
Checklist
- Design state schema and graph structure
- Build nodes and conditional routing logic
- Implement checkpointing to PostgreSQL
- Add human-in-the-loop interrupt points
- Stream state updates to clients
- Deploy as an API service
- Add error recovery and retries
Conclusion
LangGraph transforms AI workflows from simple chains into sophisticated state machines. Checkpointing enables long-running workflows that survive interruptions. Human-in-the-loop patterns make agents safer by keeping humans in control. Start with a simple graph, add checkpointing, then introduce conditional routing and interrupts. As your workflows grow more complex, LangGraph''s primitives scale with you.