Most posts about agent memory teach you the tools — how to configure a checkpointer, what store.put() does, when to use Mem0 vs Zep. That's useful, but it answers the wrong question first.
The question that should come before all of those is: what memory architecture does my agent pattern actually need?
Anthropic's "Building Effective Agents" defines five workflow patterns — prompt chaining, routing, parallelization, orchestrator-workers, and evaluator-optimizer — plus fully autonomous agents. It's the clearest taxonomy of agentic system design that exists. But memory gets exactly one sentence: it's listed as one of three augmentations to the base LLM, alongside retrieval and tools. No design guidance. No tradeoffs.
LangGraph's official docs cover memory precisely. Two primitives: the Checkpointer for short-term thread-scoped state, and the BaseStore for long-term cross-thread memory. But the docs don't tell you which patterns need which, or how to compose them when your system combines multiple patterns.
That gap is what this post fills. After building agents that span all five of Anthropic's patterns — procurement matching, inventory forecasting, RevOps automation, compliance monitoring — here's the design framework I use to decide memory architecture before writing a line of code.
The Two LangGraph Memory Primitives
Before the framework, you need a precise understanding of what each primitive actually does. Conflating them is the most common design mistake.
Checkpointer: Short-Term, Thread-Scoped
The Checkpointer saves a complete snapshot of your graph state after every superstep — every node execution. It's scoped to a thread_id. Same thread ID, same state. Different thread ID, independent state.
from langgraph.checkpoint.memory import InMemorySaver # dev only
from langgraph.checkpoint.postgres import PostgresSaver # production
from langgraph.graph import StateGraph, MessagesState, START
# Development
checkpointer = InMemorySaver()
# Production — call checkpointer.setup() once on first use
# checkpointer = PostgresSaver.from_conn_string(DB_URI)
builder = StateGraph(MessagesState)
# ... add nodes and edges ...
graph = builder.compile(checkpointer=checkpointer)
# thread_id scopes the conversation
config = {"configurable": {"thread_id": "session-abc-123"}}
graph.invoke({"messages": [{"role": "user", "content": "hello"}]}, config)What the checkpointer gives you:
- Multi-turn conversation continuity within a session
- Crash recovery — resume exactly where you left off
- Human-in-the-loop via
interrupt()— state survives the pause - Time travel —
graph.get_state_history(config)lets you replay any prior checkpoint - It resets when
thread_idchanges. A returning user with a new session gets no memory of the old one.
BaseStore: Long-Term, Cross-Thread
The BaseStore persists JSON documents across all threads for a given namespace. It survives session restarts. A user returning next week with a completely new thread_id can still access their stored preferences.
from langgraph.store.memory import InMemoryStore # dev only
from langgraph.store.postgres import PostgresStore # production
from langgraph.store.base import BaseStore
from langchain_core.runnables import RunnableConfig
import uuid
# Development
store = InMemoryStore()
# Compile the graph with BOTH checkpointer and store
graph = builder.compile(checkpointer=checkpointer, store=store)
# Invoke with both thread_id (session scope) and user_id (user scope)
config = {
"configurable": {
"thread_id": "session-abc-123",
"user_id": "user-456",
}
}Inside a node, LangGraph auto-injects the store as a keyword argument:
def agent_node(state: MessagesState, config: RunnableConfig, *, store: BaseStore):
user_id = config["configurable"]["user_id"]
# Namespace: (user_id, category) — think of it as a folder path
namespace = (user_id, "preferences")
# Read
memories = store.search(namespace, query=state["messages"][-1].content, limit=3)
context = "\n".join([m.value["content"] for m in memories])
# Write
store.put(namespace, str(uuid.uuid4()), {"content": "User prefers concise responses"})
# Use context
system = f"User preferences:\n{context}" if context else "No prior context."
response = llm.invoke([{"role": "system", "content": system}] + state["messages"])
return {"messages": [response]}The key distinction: thread_id is a session identifier. user_id is a person identifier. They serve different purposes and both belong in config["configurable"].
| Checkpointer | BaseStore | |
|---|---|---|
| Scope | Single thread | All threads |
| Survives session end | Yes (with PostgresSaver) | Yes |
Survives thread_id change |
No | Yes |
| What it stores | Full graph state snapshot | JSON documents, namespaced |
| Access pattern | Automatic (LangGraph handles it) | Explicit store.put() / store.search() |
| Primary use | Conversation history, execution state | User facts, preferences, learned knowledge |
| Dev backend | InMemorySaver |
InMemoryStore |
| Prod backend | PostgresSaver |
PostgresStore |
Now the framework.
The Framework: Matching Anthropic's Patterns to Memory Architecture
Pattern 1: Prompt Chaining
What it is: A fixed sequence of LLM calls where each step processes the output of the previous one. Optionally with programmatic gates between steps.
When Anthropic recommends it: Tasks that decompose cleanly into fixed subtasks. Trade latency for higher accuracy by making each LLM call easier. Examples: draft → translate → check, or outline → validate → write.
Memory architecture: Checkpointer only.
Prompt chaining is stateless between runs by design. Each invocation is a new pipeline. The value is within a single execution — passing outputs between steps is just state flowing through the graph, which the checkpointer handles automatically.
from typing import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.postgres import PostgresSaver
class ChainState(TypedDict):
input: str
draft: str
translated: str
final: str
def draft_node(state: ChainState):
draft = llm.invoke(f"Write marketing copy for: {state['input']}")
return {"draft": draft.content}
def translate_node(state: ChainState):
translated = llm.invoke(f"Translate to Italian: {state['draft']}")
return {"translated": translated.content}
def gate_node(state: ChainState):
# Programmatic check — does the translation look right?
# Could be a rule, another LLM call, or an API check
if len(state["translated"]) < 10:
raise ValueError("Translation too short — retry")
return {}
def finalize_node(state: ChainState):
final = llm.invoke(f"Add Italian legal disclaimer to: {state['translated']}")
return {"final": final.content}
builder = StateGraph(ChainState)
builder.add_node("draft", draft_node)
builder.add_node("translate", translate_node)
builder.add_node("gate", gate_node)
builder.add_node("finalize", finalize_node)
builder.add_edge(START, "draft")
builder.add_edge("draft", "translate")
builder.add_edge("translate", "gate")
builder.add_edge("gate", "finalize")
builder.add_edge("finalize", END)
# Checkpointer only — no store needed
# Enables crash recovery and human-in-the-loop at any gate
graph = builder.compile(checkpointer=checkpointer)Why no store: Each chain run is independent. There's no user-specific knowledge that needs to survive between runs. If you're chaining for the same user repeatedly and want to carry context (e.g., their style preferences), add a store — but that's a product decision, not an architecture requirement.
Pattern 2: Routing
What it is: A classifier LLM routes an input to a specialized downstream handler. Different inputs get different prompts, tools, and models.
When Anthropic recommends it: Complex tasks with distinct categories better handled separately. Examples: support tickets routed to billing, technical, or general agents; easy queries routed to Haiku, hard queries to Sonnet.
Memory architecture: Checkpointer + optional store for routing history.
The routing decision itself is stateless. But routing agents often need user context to classify correctly — "is this a billing question?" depends on what plan the user is on, which is stored knowledge.
from typing import TypedDict, Literal
from langgraph.graph import StateGraph, MessagesState, START, END
from langchain_core.runnables import RunnableConfig
from langgraph.store.base import BaseStore
class RouterState(MessagesState):
route: str # "billing" | "technical" | "general"
def classify_node(state: RouterState, config: RunnableConfig, *, store: BaseStore):
user_id = config["configurable"]["user_id"]
namespace = (user_id, "account")
# Retrieve user context to inform routing
account_info = store.search(namespace, query="subscription plan", limit=1)
plan = account_info[0].value.get("plan", "unknown") if account_info else "unknown"
last_message = state["messages"][-1].content
prompt = f"""Classify this support message. User is on {plan} plan.
Message: {last_message}
Respond with exactly one word: billing, technical, or general."""
result = llm.invoke(prompt)
return {"route": result.content.strip().lower()}
def route_decision(state: RouterState) -> Literal["billing", "technical", "general"]:
return state["route"]
def billing_node(state: RouterState):
response = llm_sonnet.invoke(
[{"role": "system", "content": "You are a billing specialist."}]
+ state["messages"]
)
return {"messages": [response]}
def technical_node(state: RouterState):
response = llm_sonnet.invoke(
[{"role": "system", "content": "You are a technical support engineer."}]
+ state["messages"]
)
return {"messages": [response]}
def general_node(state: RouterState):
# Cheap model for simple queries
response = llm_haiku.invoke(state["messages"])
return {"messages": [response]}
builder = StateGraph(RouterState)
builder.add_node("classify", classify_node)
builder.add_node("billing", billing_node)
builder.add_node("technical", technical_node)
builder.add_node("general", general_node)
builder.add_edge(START, "classify")
builder.add_conditional_edges("classify", route_decision)
builder.add_edge("billing", END)
builder.add_edge("technical", END)
builder.add_edge("general", END)
graph = builder.compile(checkpointer=checkpointer, store=store)Key design note: The store is read-only in this pattern at the routing step. Writing to the store (e.g., updating the user's account info) happens in downstream handlers, not in the classifier. Keep the router fast and cheap.
Pattern 3: Parallelization
What it is: Multiple LLM calls run simultaneously. Two variants: sectioning (divide a task into parallel independent subtasks) and voting (run the same task multiple times for consensus).
When Anthropic recommends it: When subtasks are independent and can run in parallel for speed, or when multiple perspectives improve confidence. Examples: guardrails check running alongside the main response; multiple reviewers flagging code vulnerabilities.
Memory architecture: Checkpointer only, with careful state design.
Parallelization doesn't need cross-thread memory. The challenge is state management when parallel branches write back to shared state — you need reducers to merge results correctly.
from typing import TypedDict, Annotated
import operator
from langgraph.graph import StateGraph, START, END
class ParallelState(TypedDict):
input: str
# Annotated with operator.add means each branch appends — not overwrites
reviews: Annotated[list[str], operator.add]
final_verdict: str
def review_security(state: ParallelState):
result = llm.invoke(f"Review for security vulnerabilities: {state['input']}")
return {"reviews": [f"SECURITY: {result.content}"]}
def review_performance(state: ParallelState):
result = llm.invoke(f"Review for performance issues: {state['input']}")
return {"reviews": [f"PERFORMANCE: {result.content}"]}
def review_style(state: ParallelState):
result = llm.invoke(f"Review for code style issues: {state['input']}")
return {"reviews": [f"STYLE: {result.content}"]}
def synthesize_node(state: ParallelState):
all_reviews = "\n\n".join(state["reviews"])
verdict = llm.invoke(f"Summarize these code reviews:\n{all_reviews}")
return {"final_verdict": verdict.content}
builder = StateGraph(ParallelState)
builder.add_node("security", review_security)
builder.add_node("performance", review_performance)
builder.add_node("style", review_style)
builder.add_node("synthesize", synthesize_node)
# Fan out to all three in parallel
builder.add_edge(START, "security")
builder.add_edge(START, "performance")
builder.add_edge(START, "style")
# Fan in — synthesize runs after all three complete
builder.add_edge("security", "synthesize")
builder.add_edge("performance", "synthesize")
builder.add_edge("style", "synthesize")
builder.add_edge("synthesize", END)
graph = builder.compile(checkpointer=checkpointer)The reducer pattern is mandatory: Without Annotated[list, operator.add], parallel branches writing to the same state key will overwrite each other. The last branch to complete wins and the others are lost. Always use reducers for fields that parallel branches write to.
When to add a store here: If you're doing parallelization for guardrails (one branch is a safety checker), store the safety decisions for audit logging — not for agent personalization.
Pattern 4: Orchestrator-Workers
What it is: A central orchestrator LLM dynamically breaks a task into subtasks and delegates to worker LLMs. Workers report back; orchestrator synthesizes. Unlike parallelization, subtasks aren't predefined — the orchestrator decides at runtime.
When Anthropic recommends it: Complex tasks where you can't predict required subtasks. Examples: coding agents that edit multiple files; research tasks gathering from multiple sources.
Memory architecture: Checkpointer + BaseStore. This is the pattern that most needs both.
The orchestrator needs to track overall task state (checkpointer) and the workers need access to accumulated knowledge about the user or domain (store). As workers complete subtasks, findings should persist across the orchestration run.
from typing import TypedDict, Annotated
import operator
from langgraph.graph import StateGraph, MessagesState, START, END
from langchain_core.runnables import RunnableConfig
from langgraph.store.base import BaseStore
import uuid
class OrchestratorState(TypedDict):
task: str
subtasks: list[str]
results: Annotated[list[str], operator.add]
final_output: str
def orchestrator_node(state: OrchestratorState, config: RunnableConfig, *, store: BaseStore):
user_id = config["configurable"]["user_id"]
# Read relevant past work for this user from the store
namespace = (user_id, "domain_knowledge")
prior_knowledge = store.search(namespace, query=state["task"], limit=3)
context = "\n".join([m.value["content"] for m in prior_knowledge])
prompt = f"""Break this task into 2-4 independent subtasks.
Prior context about this user's domain:
{context if context else "No prior context."}
Task: {state["task"]}
Return a JSON list of subtask descriptions."""
result = llm.invoke(prompt)
# Parse subtasks from result (simplified)
import json
subtasks = json.loads(result.content)
return {"subtasks": subtasks}
def worker_node(state: OrchestratorState, config: RunnableConfig, *, store: BaseStore):
user_id = config["configurable"]["user_id"]
# Each worker tackles one subtask
# In production, you'd fan out workers dynamically using Send()
subtask = state["subtasks"][len(state["results"])]
result = llm.invoke(f"Complete this subtask: {subtask}")
# Workers persist findings to the store for future runs
namespace = (user_id, "domain_knowledge")
store.put(
namespace,
str(uuid.uuid4()),
{"content": result.content, "subtask": subtask}
)
return {"results": [result.content]}
def synthesizer_node(state: OrchestratorState):
combined = "\n\n".join(state["results"])
final = llm.invoke(f"Synthesize these worker results into a final answer:\n{combined}")
return {"final_output": final.content}
def should_continue(state: OrchestratorState):
if len(state["results"]) < len(state["subtasks"]):
return "worker"
return "synthesize"
builder = StateGraph(OrchestratorState)
builder.add_node("orchestrator", orchestrator_node)
builder.add_node("worker", worker_node)
builder.add_node("synthesize", synthesizer_node)
builder.add_edge(START, "orchestrator")
builder.add_edge("orchestrator", "worker")
builder.add_conditional_edges("worker", should_continue)
builder.add_edge("synthesize", END)
graph = builder.compile(checkpointer=checkpointer, store=store)Design principle: The orchestrator reads from the store to plan better. Workers write to the store as they discover things. Over time, the store becomes domain knowledge that makes every future orchestration run faster and more accurate — this is the compounding value of the store in this pattern.
Pattern 5: Evaluator-Optimizer
What it is: One LLM generates a response; another evaluates it and provides feedback in a loop. Continues until the evaluator is satisfied or a max iteration limit is hit.
When Anthropic recommends it: When clear evaluation criteria exist and iterative refinement provides measurable value. Examples: translation with nuance review; research tasks that loop until comprehensive.
Memory architecture: Checkpointer + store for evaluation criteria and past scores.
The loop state (drafts, scores, iterations) lives in the checkpointer. But evaluation criteria — what "good" looks like for this user — should live in the store and be loaded at the start of each run, not re-specified every time.
from typing import TypedDict, Annotated
import operator
from langgraph.graph import StateGraph, START, END
from langchain_core.runnables import RunnableConfig
from langgraph.store.base import BaseStore
class EvalState(TypedDict):
task: str
current_draft: str
feedback: str
iteration: int
score: float
final: str
MAX_ITERATIONS = 3
PASS_THRESHOLD = 0.8
def generator_node(state: EvalState, config: RunnableConfig, *, store: BaseStore):
user_id = config["configurable"]["user_id"]
namespace = (user_id, "style_preferences")
# Load user's style preferences from the store
prefs = store.search(namespace, query="writing style", limit=2)
style_context = "\n".join([p.value["content"] for p in prefs])
prompt = f"""Task: {state['task']}
{"Previous feedback to incorporate: " + state['feedback'] if state['feedback'] else ""}
{"User style preferences: " + style_context if style_context else ""}
Generate a high-quality response."""
draft = llm.invoke(prompt)
return {"current_draft": draft.content, "iteration": state.get("iteration", 0) + 1}
def evaluator_node(state: EvalState, config: RunnableConfig, *, store: BaseStore):
user_id = config["configurable"]["user_id"]
namespace = (user_id, "eval_criteria")
# Load evaluation criteria specific to this user/project
criteria = store.search(namespace, query="quality criteria", limit=3)
criteria_text = "\n".join([c.value["content"] for c in criteria])
prompt = f"""Evaluate this response on a scale of 0.0 to 1.0.
Evaluation criteria:
{criteria_text if criteria_text else "- Accurate, clear, complete, well-structured"}
Response to evaluate:
{state['current_draft']}
Return JSON: {{"score": 0.0-1.0, "feedback": "specific improvement suggestions"}}"""
result = llm.invoke(prompt)
import json
evaluation = json.loads(result.content)
return {"score": evaluation["score"], "feedback": evaluation["feedback"]}
def should_iterate(state: EvalState):
if state["score"] >= PASS_THRESHOLD:
return "accept"
if state.get("iteration", 0) >= MAX_ITERATIONS:
return "accept" # Hit max iterations — take best draft
return "regenerate"
def accept_node(state: EvalState):
return {"final": state["current_draft"]}
builder = StateGraph(EvalState)
builder.add_node("generate", generator_node)
builder.add_node("evaluate", evaluator_node)
builder.add_node("accept", accept_node)
builder.add_edge(START, "generate")
builder.add_edge("generate", "evaluate")
builder.add_conditional_edges(
"evaluate",
should_iterate,
{"regenerate": "generate", "accept": "accept"}
)
builder.add_edge("accept", END)
graph = builder.compile(checkpointer=checkpointer, store=store)Critical: always set MAX_ITERATIONS. Without a stopping condition, an evaluator that never hits the threshold will loop forever and burn through your token budget. Anthropic explicitly calls this out: agents need stopping conditions to maintain control.
Pattern 6: Autonomous Agents
What it is: The LLM dynamically decides its own process — which tools to call, when to loop, when to stop. No predefined code path. Open-ended tasks where the number of steps can't be known in advance.
When Anthropic recommends it: Open-ended problems with trusted environments. Examples: SWE-bench coding tasks, computer use, research agents. Higher cost and compounding error risk — requires sandbox testing and guardrails.
Memory architecture: Checkpointer + BaseStore + short-term context management.
Autonomous agents are the only pattern where you need all three concerns actively managed: session state (checkpointer), persistent knowledge (store), and context window management (trim or summarize messages to prevent overflow).
from typing import TypedDict, Annotated
from langchain_core.messages import BaseMessage, trim_messages
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.prebuilt import ToolNode, tools_condition
from langchain_core.runnables import RunnableConfig
from langgraph.store.base import BaseStore
import uuid
# --- Context window management ---
def manage_context(state: MessagesState) -> MessagesState:
"""Trim messages to stay within context window before LLM call."""
trimmed = trim_messages(
state["messages"],
max_tokens=6000, # Leave room for system prompt + response
strategy="last", # Keep the most recent messages
token_counter=llm,
allow_partial=False,
)
return {"messages": trimmed}
# --- Memory-aware agent node ---
def agent_node(state: MessagesState, config: RunnableConfig, *, store: BaseStore):
user_id = config["configurable"]["user_id"]
# Load long-term memory — what the agent knows about this user
preferences = store.search(
(user_id, "preferences"), query=state["messages"][-1].content, limit=3
)
past_decisions = store.search(
(user_id, "decisions"), query=state["messages"][-1].content, limit=2
)
pref_text = "\n".join([p.value["content"] for p in preferences])
decision_text = "\n".join([d.value["summary"] for d in past_decisions])
system = f"""You are an autonomous agent. Use your tools to complete the task.
What I know about this user:
{pref_text if pref_text else "No prior preferences stored."}
Relevant past decisions:
{decision_text if decision_text else "No relevant past decisions."}
Think step by step. Use tools when you need external data.
Stop when the task is complete."""
response = llm_with_tools.invoke(
[{"role": "system", "content": system}] + state["messages"]
)
return {"messages": [response]}
# --- Memory save node (runs after agent responds) ---
def save_memory_node(state: MessagesState, config: RunnableConfig, *, store: BaseStore):
"""Extract and persist key facts after each agent turn."""
user_id = config["configurable"]["user_id"]
# Only save if there's a meaningful exchange to record
if len(state["messages"]) < 2:
return state
last_human = next(
(m.content for m in reversed(state["messages"]) if m.type == "human"), None
)
last_ai = next(
(m.content for m in reversed(state["messages"]) if m.type == "ai"), None
)
if not (last_human and last_ai):
return state
# Use a cheap model for extraction — don't use your main model here
extraction = llm_haiku.invoke(
f"""Extract any important user preferences or decisions from this exchange.
If nothing significant, respond with exactly: NOTHING
Human: {last_human[:500]}
Agent: {last_ai[:500]}"""
)
if extraction.content.strip() != "NOTHING":
store.put(
(user_id, "preferences"),
str(uuid.uuid4()),
{"content": extraction.content}
)
return state
# --- Wire it together ---
tool_node = ToolNode(tools=[search_tool, calculator_tool, fetch_data_tool])
builder = StateGraph(MessagesState)
builder.add_node("trim_context", manage_context)
builder.add_node("agent", agent_node)
builder.add_node("tools", tool_node)
builder.add_node("save_memory", save_memory_node)
builder.add_edge(START, "trim_context")
builder.add_edge("trim_context", "agent")
builder.add_conditional_edges("agent", tools_condition)
builder.add_edge("tools", "trim_context") # Trim before going back to agent
builder.add_edge("agent", "save_memory") # Save after final response
builder.add_edge("save_memory", END)
graph = builder.compile(checkpointer=checkpointer, store=store)Three things that prevent autonomous agent failures:
- Context trimming before every LLM call. Long-running agents accumulate messages fast. Use
trim_messagesor a summarize node — without it, you'll hit context limits in production within days. - A stopping condition. Anthropic is explicit: use a max iteration count or explicit completion check. An autonomous agent without an exit condition will loop until it errors or burns your budget.
save_memoryruns after — not before — the agent responds. Saving before the response adds latency to every turn. Save asynchronously after, using a cheap model for extraction.
The Decision Framework
Use this before designing any agent:
1. Is the task decomposable into FIXED steps?
YES → Prompt Chaining
Memory: Checkpointer only
2. Does the input need to go to DIFFERENT specialized handlers?
YES → Routing
Memory: Checkpointer + Store (to inform routing decisions)
3. Can subtasks run INDEPENDENTLY at the same time?
YES → Parallelization
Memory: Checkpointer + reducer-annotated state fields
4. Do you need to DYNAMICALLY DECIDE subtasks at runtime?
YES → Orchestrator-Workers
Memory: Checkpointer + Store (workers accumulate domain knowledge)
5. Does the output need ITERATIVE REFINEMENT against clear criteria?
YES → Evaluator-Optimizer
Memory: Checkpointer + Store (criteria and style preferences persist)
ALWAYS set MAX_ITERATIONS
6. Is the task OPEN-ENDED with an unknown number of steps?
YES → Autonomous Agent
Memory: Checkpointer + Store + context trimming
ALWAYS set stopping conditions
Memory Backend: When to Use What
The code examples above use InMemorySaver and InMemoryStore for clarity. Here's what to use in production:
| Stage | Checkpointer | Store |
|---|---|---|
| Dev / testing | InMemorySaver |
InMemoryStore |
| Local persistent dev | SqliteSaver.from_conn_string("agent.db") |
No SQLite store — use InMemoryStore |
| Production (single server) | PostgresSaver.from_conn_string(DB_URI) |
PostgresStore.from_conn_string(DB_URI) |
| Production (multi-server) | PostgresSaver with connection pool |
PostgresStore with connection pool |
Required first-time setup for Postgres:
# Run once — creates the required tables
checkpointer.setup()
store.setup()Connection pooling for production:
from psycopg_pool import ConnectionPool
from langgraph.checkpoint.postgres import PostgresSaver
# Pool of 10 connections shared across concurrent requests
# Without this: each request opens a new TCP connection
# At 100 concurrent users = 100 open Postgres connections
connection_kwargs = {"autocommit": True, "prepare_threshold": 0}
pool = ConnectionPool(
conninfo=DB_URI,
max_size=10,
kwargs=connection_kwargs,
)
checkpointer = PostgresSaver(pool)Managing Short-Term Memory Overflow
For patterns that run multi-turn conversations (routing, orchestrator-workers, autonomous agents), message history grows without bound. Two strategies:
Strategy 1: Trim (fast, lossy)
from langchain_core.messages import trim_messages
def call_model(state: MessagesState):
# Keep only the last 6000 tokens before calling the LLM
messages = trim_messages(
state["messages"],
max_tokens=6000,
strategy="last",
token_counter=llm,
allow_partial=False,
include_system=True, # Always keep system message
)
return {"messages": [llm.invoke(messages)]}Strategy 2: Summarize (slower, preserves meaning)
def summarize_node(state: MessagesState):
summary = llm.invoke(
state["messages"]
+ [{"role": "user", "content": "Summarize this conversation in 3 sentences."}]
)
# Keep only the summary + last 2 messages
return {
"messages": [
{"role": "system", "content": f"Conversation summary: {summary.content}"}
] + state["messages"][-2:]
}
def should_summarize(state: MessagesState):
# Trigger summarization when message count exceeds threshold
return "summarize" if len(state["messages"]) > 20 else "agent"Use trimming for fast-moving tool-heavy agents where recent context matters most. Use summarization for conversational agents where early context (user's original goal) needs to persist.
Production Checklist
Before shipping any agent:
- Checkpointer backend:
InMemorySaveronly in dev — switch toPostgresSaverbefore production - Call
checkpointer.setup(): Required once on first Postgres use — creates tables - Connection pooling: Required for multi-server deployments or >10 concurrent users
thread_idanduser_idare different:thread_idscopes sessions,user_idscopes users — both inconfig["configurable"]- Namespace by user from day one: Always
(user_id, category)— never a shared flat namespace - Reducers on parallel state fields: Any field written by parallel branches needs
Annotated[list, operator.add]or equivalent MAX_ITERATIONSon every loop: Evaluator-optimizer and autonomous agents must have stopping conditions- Context trimming on long-running agents: Add before every LLM call in agents that run >10 turns
- Memory saves are post-response: Run
save_memoryafter the agent responds, not before — keeps hot path latency low - Use cheap models for extraction:
claude-haiku-4-5for memory extraction, not your main model
Related Posts
- Adding Memory to Production AI Agents: Mem0, Zep, and LangMem Compared — when to go beyond LangGraph's built-in store and add a dedicated memory layer
- Building AI Agents with LangGraph: From Prototype to Production — durable execution,
interrupt(), and the@taskpattern - LangSmith in Production: Observability, Evaluation, and Debugging AI Agents — tracing memory retrieval and building evaluation loops for agent quality
Building a multi-agent system and not sure which patterns to combine or how to wire the memory layer? I've shipped all six of these patterns across BandiFinder, RevAgent, Pellemoda, and Holding Morelli — EU-facing products with real compliance requirements. Get in touch or book a call.