MCP Server Performance Optimization
Introduction
As your MCP servers move from development to production, performance becomes critical. Slow tool responses degrade the AI experience, and unoptimized servers waste resources and increase costs. This tutorial covers practical strategies for improving MCP server performance, from caching and connection pooling to rate limiting and monitoring.
For deployment-specific optimizations, see our Docker, Kubernetes, and Lambda tutorials.
Measuring Performance
Before optimizing, establish baselines. Instrument your server to track key metrics.
Key Metrics
- Tool response latency (p50, p95, p99)
- Throughput (requests per second)
- Error rate (failures per total requests)
- Resource utilization (CPU, memory, connections)
Instrumentation with TypeScript
import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js';
import { z } from 'zod';
interface Metric {
tool: string;
duration: number;
status: 'success' | 'error';
timestamp: number;
}
const metrics: Metric[] = [];
function withMetrics<T>(toolName: string, fn: () => Promise<T>): Promise<T> {
const start = Date.now();
return fn()
.then((result) => {
metrics.push({ tool: toolName, duration: Date.now() - start, status: 'success', timestamp: start });
return result;
})
.catch((error) => {
metrics.push({ tool: toolName, duration: Date.now() - start, status: 'error', timestamp: start });
throw error;
});
}
// Metrics endpoint
app.get('/metrics', (req, res) => {
const now = Date.now();
const recentMetrics = metrics.filter((m) => now - m.timestamp < 300000); // Last 5 minutes
const byTool = new Map<string, number[]>();
for (const m of recentMetrics) {
const durations = byTool.get(m.tool) || [];
durations.push(m.duration);
byTool.set(m.tool, durations);
}
const summary: Record<string, { p50: number; p95: number; count: number }> = {};
for (const [tool, durations] of byTool) {
durations.sort((a, b) => a - b);
summary[tool] = {
p50: durations[Math.floor(durations.length * 0.5)],
p95: durations[Math.floor(durations.length * 0.95)],
count: durations.length,
};
}
res.json(summary);
});
Python Instrumentation
import time
import json
from functools import wraps
from collections import defaultdict
tool_metrics = defaultdict(list)
def track_performance(func):
@wraps(func)
def wrapper(*args, **kwargs):
start = time.monotonic()
try:
result = func(*args, **kwargs)
duration = (time.monotonic() - start) * 1000
tool_metrics[func.__name__].append({"duration": duration, "status": "success"})
return result
except Exception as e:
duration = (time.monotonic() - start) * 1000
tool_metrics[func.__name__].append({"duration": duration, "status": "error"})
raise
return wrapper
@mcp.tool()
@track_performance
def my_tool(data: str) -> str:
"""Process data.
Args:
data: Input data
"""
return process(data)
Caching Strategies
Caching is the most impactful optimization for MCP servers that access external data or perform expensive computations.
In-Memory Cache
import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js';
import { z } from 'zod';
interface CacheEntry<T> {
value: T;
expiresAt: number;
}
class SimpleCache<T> {
private cache = new Map<string, CacheEntry<T>>();
private maxSize: number;
constructor(maxSize = 1000) {
this.maxSize = maxSize;
}
get(key: string): T | undefined {
const entry = this.cache.get(key);
if (!entry) return undefined;
if (Date.now() > entry.expiresAt) {
this.cache.delete(key);
return undefined;
}
return entry.value;
}
set(key: string, value: T, ttlMs: number): void {
if (this.cache.size >= this.maxSize) {
// Evict oldest entry
const firstKey = this.cache.keys().next().value;
if (firstKey) this.cache.delete(firstKey);
}
this.cache.set(key, { value, expiresAt: Date.now() + ttlMs });
}
}
const cache = new SimpleCache<string>();
server.tool(
'get-weather',
'Get weather for a city',
{ city: z.string() },
async ({ city }) => {
const cacheKey = `weather:${city.toLowerCase()}`;
const cached = cache.get(cacheKey);
if (cached) {
return { content: [{ type: 'text', text: cached }] };
}
const weather = await fetchWeather(city);
const result = JSON.stringify(weather);
cache.set(cacheKey, result, 300000); // Cache for 5 minutes
return { content: [{ type: 'text', text: result }] };
}
);
Python Caching with TTL
from functools import lru_cache
from time import time
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("cached-server")
class TTLCache:
def __init__(self, ttl_seconds: int = 300, max_size: int = 1000):
self._cache: dict[str, tuple[float, any]] = {}
self._ttl = ttl_seconds
self._max_size = max_size
def get(self, key: str):
if key in self._cache:
expires, value = self._cache[key]
if time() < expires:
return value
del self._cache[key]
return None
def set(self, key: str, value):
if len(self._cache) >= self._max_size:
# Remove expired entries first
now = time()
self._cache = {k: v for k, v in self._cache.items() if v[0] > now}
self._cache[key] = (time() + self._ttl, value)
cache = TTLCache(ttl_seconds=300)
@mcp.tool()
def get_user_profile(user_id: str) -> str:
"""Get user profile with caching.
Args:
user_id: User ID to look up
"""
cached = cache.get(f"user:{user_id}")
if cached:
return cached
profile = fetch_profile_from_db(user_id)
result = json.dumps(profile)
cache.set(f"user:{user_id}", result)
return result
Redis Cache for Multi-Instance Deployments
When running multiple MCP server instances (in Kubernetes, for example), use Redis for shared caching:
import Redis from 'ioredis';
const redis = new Redis(process.env.REDIS_URL);
async function cachedFetch<T>(key: string, ttlSeconds: number, fetchFn: () => Promise<T>): Promise<T> {
const cached = await redis.get(key);
if (cached) {
return JSON.parse(cached);
}
const result = await fetchFn();
await redis.setex(key, ttlSeconds, JSON.stringify(result));
return result;
}
server.tool(
'query-database',
'Query with caching',
{ query: z.string() },
async ({ query }) => {
const result = await cachedFetch(
`query:${Buffer.from(query).toString('base64')}`,
60,
() => executeQuery(query)
);
return { content: [{ type: 'text', text: JSON.stringify(result) }] };
}
);
Connection Pooling
Database Connection Pools
Never create a new database connection per tool call:
import { Pool } from 'pg';
// Create pool once at startup
const pool = new Pool({
connectionString: process.env.DATABASE_URL,
max: 20, // Maximum connections
idleTimeoutMillis: 30000,
connectionTimeoutMillis: 5000,
});
server.tool(
'query',
'Execute a database query',
{ sql: z.string() },
async ({ sql }) => {
const client = await pool.connect();
try {
const result = await client.query(sql);
return { content: [{ type: 'text', text: JSON.stringify(result.rows) }] };
} finally {
client.release(); // Return connection to pool
}
}
);
HTTP Connection Reuse
import httpx
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("pooled-server")
# Create a shared HTTP client with connection pooling
http_client = httpx.AsyncClient(
timeout=30.0,
limits=httpx.Limits(max_connections=100, max_keepalive_connections=20),
)
@mcp.tool()
async def fetch_api(endpoint: str) -> str:
"""Fetch data from an API with connection reuse.
Args:
endpoint: API endpoint to call
"""
response = await http_client.get(f"https://api.example.com/{endpoint}")
return response.text
Rate Limiting
Rate limiting protects your server and downstream services from overload.
Token Bucket Algorithm
class TokenBucket {
private tokens: number;
private lastRefill: number;
constructor(
private maxTokens: number,
private refillRate: number, // tokens per second
) {
this.tokens = maxTokens;
this.lastRefill = Date.now();
}
tryConsume(count = 1): boolean {
this.refill();
if (this.tokens >= count) {
this.tokens -= count;
return true;
}
return false;
}
private refill() {
const now = Date.now();
const elapsed = (now - this.lastRefill) / 1000;
this.tokens = Math.min(this.maxTokens, this.tokens + elapsed * this.refillRate);
this.lastRefill = now;
}
}
// Per-tool rate limiters
const rateLimiters = new Map<string, TokenBucket>();
function getRateLimiter(tool: string, maxPerMinute: number): TokenBucket {
if (!rateLimiters.has(tool)) {
rateLimiters.set(tool, new TokenBucket(maxPerMinute, maxPerMinute / 60));
}
return rateLimiters.get(tool)!;
}
Tiered Rate Limits
from enum import Enum
class Tier(Enum):
FREE = "free"
PRO = "pro"
ENTERPRISE = "enterprise"
RATE_LIMITS = {
Tier.FREE: {"requests_per_minute": 10, "daily_limit": 100},
Tier.PRO: {"requests_per_minute": 60, "daily_limit": 10000},
Tier.ENTERPRISE: {"requests_per_minute": 300, "daily_limit": 1000000},
}
Async and Concurrent Processing
Parallel Tool Execution
When a tool needs to fetch data from multiple sources:
import asyncio
from mcp.server.fastmcp import FastMCP, Context
mcp = FastMCP("parallel-server")
@mcp.tool()
async def aggregate_data(sources: list[str], ctx: Context) -> str:
"""Fetch and aggregate data from multiple sources in parallel.
Args:
sources: List of data source identifiers
"""
async def fetch_source(source: str) -> dict:
async with httpx.AsyncClient() as client:
response = await client.get(f"https://api.example.com/{source}")
return {"source": source, "data": response.json()}
# Fetch all sources concurrently
tasks = [fetch_source(s) for s in sources]
results = await asyncio.gather(*tasks, return_exceptions=True)
successful = [r for r in results if not isinstance(r, Exception)]
failed = [str(r) for r in results if isinstance(r, Exception)]
return json.dumps({
"results": successful,
"errors": failed,
"total": len(sources),
"succeeded": len(successful),
})
Response Optimization
Compress Large Responses
import zlib from 'zlib';
function compressIfLarge(data: string, threshold = 10000): string {
if (data.length < threshold) return data;
// For very large results, return a summary with option to paginate
const parsed = JSON.parse(data);
if (Array.isArray(parsed) && parsed.length > 100) {
return JSON.stringify({
total: parsed.length,
showing: 100,
data: parsed.slice(0, 100),
message: 'Results truncated. Use pagination for full results.',
});
}
return data;
}
Pagination for Large Datasets
@mcp.tool()
def query_with_pagination(
table: str,
page: int = 1,
page_size: int = 50
) -> str:
"""Query data with pagination.
Args:
table: Table to query
page: Page number (starting from 1)
page_size: Results per page (max 100)
"""
page_size = min(page_size, 100)
offset = (page - 1) * page_size
results = db.execute(
f"SELECT * FROM {table} LIMIT ? OFFSET ?",
[page_size, offset]
)
total = db.execute(f"SELECT COUNT(*) FROM {table}")[0][0]
return json.dumps({
"data": results,
"page": page,
"page_size": page_size,
"total": total,
"total_pages": (total + page_size - 1) // page_size,
})
Monitoring and Alerting
Prometheus Metrics
import { Counter, Histogram, Gauge, register } from 'prom-client';
const toolCallsTotal = new Counter({
name: 'mcp_tool_calls_total',
help: 'Total MCP tool calls',
labelNames: ['tool', 'status'],
});
const toolDuration = new Histogram({
name: 'mcp_tool_duration_seconds',
help: 'MCP tool call duration',
labelNames: ['tool'],
buckets: [0.01, 0.05, 0.1, 0.5, 1, 5, 10],
});
const activeConnections = new Gauge({
name: 'mcp_active_connections',
help: 'Current active MCP connections',
});
// Expose metrics endpoint
app.get('/metrics', async (req, res) => {
res.set('Content-Type', register.contentType);
res.end(await register.metrics());
});
Health Check Endpoint
from http.server import HTTPServer, BaseHTTPRequestHandler
import threading
import json
import time
class HealthHandler(BaseHTTPRequestHandler):
def do_GET(self):
health = {
"status": "healthy",
"uptime": time.time() - START_TIME,
"cache_size": len(cache._cache),
"tools_registered": len(mcp._tools),
}
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(health).encode())
def log_message(self, format, *args):
pass
START_TIME = time.time()
threading.Thread(
target=lambda: HTTPServer(("0.0.0.0", 8080), HealthHandler).serve_forever(),
daemon=True,
).start()
Performance Checklist
- [ ] Tool response times are under 5 seconds (p95)
- [ ] Database queries use connection pooling
- [ ] Frequently accessed data is cached with appropriate TTLs
- [ ] HTTP clients reuse connections
- [ ] Large responses are paginated
- [ ] Rate limiting prevents resource exhaustion
- [ ] Metrics are collected and monitored
- [ ] Health checks are configured
- [ ] Async operations run concurrently where possible
- [ ] Memory usage is stable under load (no leaks)
Conclusion
MCP server performance directly impacts the AI user experience. Start by measuring your baseline, then apply caching and connection pooling for the biggest gains. Add rate limiting to protect your infrastructure, and set up monitoring to catch regressions early. These optimizations apply whether you deploy on Docker, Kubernetes, or AWS Lambda.
For more optimization patterns, browse our servers directory and review production MCP server implementations.