25 min read
Advanced
Advanced

MCP Server Performance Optimization

Optimize MCP server performance with caching, connection pooling, rate limiting, and monitoring strategies

MCPgee Team

MCP Expert

A working MCP server in production or stagingBasic understanding of performance profilingFamiliarity with caching conceptsExperience with MCP server development

MCP Server Performance Optimization

Introduction

As your MCP servers move from development to production, performance becomes critical. Slow tool responses degrade the AI experience, and unoptimized servers waste resources and increase costs. This tutorial covers practical strategies for improving MCP server performance, from caching and connection pooling to rate limiting and monitoring.

For deployment-specific optimizations, see our Docker, Kubernetes, and Lambda tutorials.

Measuring Performance

Before optimizing, establish baselines. Instrument your server to track key metrics.

Key Metrics

  1. Tool response latency (p50, p95, p99)
  2. Throughput (requests per second)
  3. Error rate (failures per total requests)
  4. Resource utilization (CPU, memory, connections)

Instrumentation with TypeScript

typescript
import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js';
import { z } from 'zod';

interface Metric {
  tool: string;
  duration: number;
  status: 'success' | 'error';
  timestamp: number;
}

const metrics: Metric[] = [];

function withMetrics<T>(toolName: string, fn: () => Promise<T>): Promise<T> {
  const start = Date.now();
  return fn()
    .then((result) => {
      metrics.push({ tool: toolName, duration: Date.now() - start, status: 'success', timestamp: start });
      return result;
    })
    .catch((error) => {
      metrics.push({ tool: toolName, duration: Date.now() - start, status: 'error', timestamp: start });
      throw error;
    });
}

// Metrics endpoint
app.get('/metrics', (req, res) => {
  const now = Date.now();
  const recentMetrics = metrics.filter((m) => now - m.timestamp < 300000); // Last 5 minutes

  const byTool = new Map<string, number[]>();
  for (const m of recentMetrics) {
    const durations = byTool.get(m.tool) || [];
    durations.push(m.duration);
    byTool.set(m.tool, durations);
  }

  const summary: Record<string, { p50: number; p95: number; count: number }> = {};
  for (const [tool, durations] of byTool) {
    durations.sort((a, b) => a - b);
    summary[tool] = {
      p50: durations[Math.floor(durations.length * 0.5)],
      p95: durations[Math.floor(durations.length * 0.95)],
      count: durations.length,
    };
  }

  res.json(summary);
});

Python Instrumentation

python
import time
import json
from functools import wraps
from collections import defaultdict

tool_metrics = defaultdict(list)

def track_performance(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        start = time.monotonic()
        try:
            result = func(*args, **kwargs)
            duration = (time.monotonic() - start) * 1000
            tool_metrics[func.__name__].append({"duration": duration, "status": "success"})
            return result
        except Exception as e:
            duration = (time.monotonic() - start) * 1000
            tool_metrics[func.__name__].append({"duration": duration, "status": "error"})
            raise
    return wrapper

@mcp.tool()
@track_performance
def my_tool(data: str) -> str:
    """Process data.

    Args:
        data: Input data
    """
    return process(data)

Caching Strategies

Caching is the most impactful optimization for MCP servers that access external data or perform expensive computations.

In-Memory Cache

typescript
import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js';
import { z } from 'zod';

interface CacheEntry<T> {
  value: T;
  expiresAt: number;
}

class SimpleCache<T> {
  private cache = new Map<string, CacheEntry<T>>();
  private maxSize: number;

  constructor(maxSize = 1000) {
    this.maxSize = maxSize;
  }

  get(key: string): T | undefined {
    const entry = this.cache.get(key);
    if (!entry) return undefined;
    if (Date.now() > entry.expiresAt) {
      this.cache.delete(key);
      return undefined;
    }
    return entry.value;
  }

  set(key: string, value: T, ttlMs: number): void {
    if (this.cache.size >= this.maxSize) {
      // Evict oldest entry
      const firstKey = this.cache.keys().next().value;
      if (firstKey) this.cache.delete(firstKey);
    }
    this.cache.set(key, { value, expiresAt: Date.now() + ttlMs });
  }
}

const cache = new SimpleCache<string>();

server.tool(
  'get-weather',
  'Get weather for a city',
  { city: z.string() },
  async ({ city }) => {
    const cacheKey = `weather:${city.toLowerCase()}`;
    const cached = cache.get(cacheKey);
    if (cached) {
      return { content: [{ type: 'text', text: cached }] };
    }

    const weather = await fetchWeather(city);
    const result = JSON.stringify(weather);
    cache.set(cacheKey, result, 300000); // Cache for 5 minutes

    return { content: [{ type: 'text', text: result }] };
  }
);

Python Caching with TTL

python
from functools import lru_cache
from time import time
from mcp.server.fastmcp import FastMCP

mcp = FastMCP("cached-server")

class TTLCache:
    def __init__(self, ttl_seconds: int = 300, max_size: int = 1000):
        self._cache: dict[str, tuple[float, any]] = {}
        self._ttl = ttl_seconds
        self._max_size = max_size

    def get(self, key: str):
        if key in self._cache:
            expires, value = self._cache[key]
            if time() < expires:
                return value
            del self._cache[key]
        return None

    def set(self, key: str, value):
        if len(self._cache) >= self._max_size:
            # Remove expired entries first
            now = time()
            self._cache = {k: v for k, v in self._cache.items() if v[0] > now}
        self._cache[key] = (time() + self._ttl, value)

cache = TTLCache(ttl_seconds=300)

@mcp.tool()
def get_user_profile(user_id: str) -> str:
    """Get user profile with caching.

    Args:
        user_id: User ID to look up
    """
    cached = cache.get(f"user:{user_id}")
    if cached:
        return cached

    profile = fetch_profile_from_db(user_id)
    result = json.dumps(profile)
    cache.set(f"user:{user_id}", result)
    return result

Redis Cache for Multi-Instance Deployments

When running multiple MCP server instances (in Kubernetes, for example), use Redis for shared caching:

typescript
import Redis from 'ioredis';

const redis = new Redis(process.env.REDIS_URL);

async function cachedFetch<T>(key: string, ttlSeconds: number, fetchFn: () => Promise<T>): Promise<T> {
  const cached = await redis.get(key);
  if (cached) {
    return JSON.parse(cached);
  }

  const result = await fetchFn();
  await redis.setex(key, ttlSeconds, JSON.stringify(result));
  return result;
}

server.tool(
  'query-database',
  'Query with caching',
  { query: z.string() },
  async ({ query }) => {
    const result = await cachedFetch(
      `query:${Buffer.from(query).toString('base64')}`,
      60,
      () => executeQuery(query)
    );
    return { content: [{ type: 'text', text: JSON.stringify(result) }] };
  }
);

Connection Pooling

Database Connection Pools

Never create a new database connection per tool call:

typescript
import { Pool } from 'pg';

// Create pool once at startup
const pool = new Pool({
  connectionString: process.env.DATABASE_URL,
  max: 20,              // Maximum connections
  idleTimeoutMillis: 30000,
  connectionTimeoutMillis: 5000,
});

server.tool(
  'query',
  'Execute a database query',
  { sql: z.string() },
  async ({ sql }) => {
    const client = await pool.connect();
    try {
      const result = await client.query(sql);
      return { content: [{ type: 'text', text: JSON.stringify(result.rows) }] };
    } finally {
      client.release(); // Return connection to pool
    }
  }
);

HTTP Connection Reuse

python
import httpx
from mcp.server.fastmcp import FastMCP

mcp = FastMCP("pooled-server")

# Create a shared HTTP client with connection pooling
http_client = httpx.AsyncClient(
    timeout=30.0,
    limits=httpx.Limits(max_connections=100, max_keepalive_connections=20),
)

@mcp.tool()
async def fetch_api(endpoint: str) -> str:
    """Fetch data from an API with connection reuse.

    Args:
        endpoint: API endpoint to call
    """
    response = await http_client.get(f"https://api.example.com/{endpoint}")
    return response.text

Rate Limiting

Rate limiting protects your server and downstream services from overload.

Token Bucket Algorithm

typescript
class TokenBucket {
  private tokens: number;
  private lastRefill: number;

  constructor(
    private maxTokens: number,
    private refillRate: number, // tokens per second
  ) {
    this.tokens = maxTokens;
    this.lastRefill = Date.now();
  }

  tryConsume(count = 1): boolean {
    this.refill();
    if (this.tokens >= count) {
      this.tokens -= count;
      return true;
    }
    return false;
  }

  private refill() {
    const now = Date.now();
    const elapsed = (now - this.lastRefill) / 1000;
    this.tokens = Math.min(this.maxTokens, this.tokens + elapsed * this.refillRate);
    this.lastRefill = now;
  }
}

// Per-tool rate limiters
const rateLimiters = new Map<string, TokenBucket>();

function getRateLimiter(tool: string, maxPerMinute: number): TokenBucket {
  if (!rateLimiters.has(tool)) {
    rateLimiters.set(tool, new TokenBucket(maxPerMinute, maxPerMinute / 60));
  }
  return rateLimiters.get(tool)!;
}

Tiered Rate Limits

python
from enum import Enum

class Tier(Enum):
    FREE = "free"
    PRO = "pro"
    ENTERPRISE = "enterprise"

RATE_LIMITS = {
    Tier.FREE: {"requests_per_minute": 10, "daily_limit": 100},
    Tier.PRO: {"requests_per_minute": 60, "daily_limit": 10000},
    Tier.ENTERPRISE: {"requests_per_minute": 300, "daily_limit": 1000000},
}

Async and Concurrent Processing

Parallel Tool Execution

When a tool needs to fetch data from multiple sources:

python
import asyncio
from mcp.server.fastmcp import FastMCP, Context

mcp = FastMCP("parallel-server")

@mcp.tool()
async def aggregate_data(sources: list[str], ctx: Context) -> str:
    """Fetch and aggregate data from multiple sources in parallel.

    Args:
        sources: List of data source identifiers
    """
    async def fetch_source(source: str) -> dict:
        async with httpx.AsyncClient() as client:
            response = await client.get(f"https://api.example.com/{source}")
            return {"source": source, "data": response.json()}

    # Fetch all sources concurrently
    tasks = [fetch_source(s) for s in sources]
    results = await asyncio.gather(*tasks, return_exceptions=True)

    successful = [r for r in results if not isinstance(r, Exception)]
    failed = [str(r) for r in results if isinstance(r, Exception)]

    return json.dumps({
        "results": successful,
        "errors": failed,
        "total": len(sources),
        "succeeded": len(successful),
    })

Response Optimization

Compress Large Responses

typescript
import zlib from 'zlib';

function compressIfLarge(data: string, threshold = 10000): string {
  if (data.length < threshold) return data;

  // For very large results, return a summary with option to paginate
  const parsed = JSON.parse(data);
  if (Array.isArray(parsed) && parsed.length > 100) {
    return JSON.stringify({
      total: parsed.length,
      showing: 100,
      data: parsed.slice(0, 100),
      message: 'Results truncated. Use pagination for full results.',
    });
  }
  return data;
}

Pagination for Large Datasets

python
@mcp.tool()
def query_with_pagination(
    table: str,
    page: int = 1,
    page_size: int = 50
) -> str:
    """Query data with pagination.

    Args:
        table: Table to query
        page: Page number (starting from 1)
        page_size: Results per page (max 100)
    """
    page_size = min(page_size, 100)
    offset = (page - 1) * page_size

    results = db.execute(
        f"SELECT * FROM {table} LIMIT ? OFFSET ?",
        [page_size, offset]
    )
    total = db.execute(f"SELECT COUNT(*) FROM {table}")[0][0]

    return json.dumps({
        "data": results,
        "page": page,
        "page_size": page_size,
        "total": total,
        "total_pages": (total + page_size - 1) // page_size,
    })

Monitoring and Alerting

Prometheus Metrics

typescript
import { Counter, Histogram, Gauge, register } from 'prom-client';

const toolCallsTotal = new Counter({
  name: 'mcp_tool_calls_total',
  help: 'Total MCP tool calls',
  labelNames: ['tool', 'status'],
});

const toolDuration = new Histogram({
  name: 'mcp_tool_duration_seconds',
  help: 'MCP tool call duration',
  labelNames: ['tool'],
  buckets: [0.01, 0.05, 0.1, 0.5, 1, 5, 10],
});

const activeConnections = new Gauge({
  name: 'mcp_active_connections',
  help: 'Current active MCP connections',
});

// Expose metrics endpoint
app.get('/metrics', async (req, res) => {
  res.set('Content-Type', register.contentType);
  res.end(await register.metrics());
});

Health Check Endpoint

python
from http.server import HTTPServer, BaseHTTPRequestHandler
import threading
import json
import time

class HealthHandler(BaseHTTPRequestHandler):
    def do_GET(self):
        health = {
            "status": "healthy",
            "uptime": time.time() - START_TIME,
            "cache_size": len(cache._cache),
            "tools_registered": len(mcp._tools),
        }
        self.send_response(200)
        self.send_header("Content-Type", "application/json")
        self.end_headers()
        self.wfile.write(json.dumps(health).encode())

    def log_message(self, format, *args):
        pass

START_TIME = time.time()
threading.Thread(
    target=lambda: HTTPServer(("0.0.0.0", 8080), HealthHandler).serve_forever(),
    daemon=True,
).start()

Performance Checklist

  • [ ] Tool response times are under 5 seconds (p95)
  • [ ] Database queries use connection pooling
  • [ ] Frequently accessed data is cached with appropriate TTLs
  • [ ] HTTP clients reuse connections
  • [ ] Large responses are paginated
  • [ ] Rate limiting prevents resource exhaustion
  • [ ] Metrics are collected and monitored
  • [ ] Health checks are configured
  • [ ] Async operations run concurrently where possible
  • [ ] Memory usage is stable under load (no leaks)

Conclusion

MCP server performance directly impacts the AI user experience. Start by measuring your baseline, then apply caching and connection pooling for the biggest gains. Add rate limiting to protect your infrastructure, and set up monitoring to catch regressions early. These optimizations apply whether you deploy on Docker, Kubernetes, or AWS Lambda.

For more optimization patterns, browse our servers directory and review production MCP server implementations.

Code Examples

In-Memory Cache with TTLtypescript
class SimpleCache<T> {
  private cache = new Map<string, { value: T; expiresAt: number }>();

  get(key: string): T | undefined {
    const entry = this.cache.get(key);
    if (!entry || Date.now() > entry.expiresAt) {
      this.cache.delete(key);
      return undefined;
    }
    return entry.value;
  }

  set(key: string, value: T, ttlMs: number): void {
    this.cache.set(key, { value, expiresAt: Date.now() + ttlMs });
  }
}

const cache = new SimpleCache<string>();
Connection Pooling (Python)python
import httpx
from mcp.server.fastmcp import FastMCP

mcp = FastMCP("pooled-server")

# Shared client with connection pooling
client = httpx.AsyncClient(
    timeout=30.0,
    limits=httpx.Limits(max_connections=100, max_keepalive_connections=20),
)

@mcp.tool()
async def fetch(url: str) -> str:
    """Fetch a URL with connection reuse.

    Args:
        url: URL to fetch
    """
    response = await client.get(url)
    return response.text
Token Bucket Rate Limitertypescript
class TokenBucket {
  private tokens: number;
  private lastRefill: number;

  constructor(private max: number, private refillRate: number) {
    this.tokens = max;
    this.lastRefill = Date.now();
  }

  tryConsume(): boolean {
    const now = Date.now();
    const elapsed = (now - this.lastRefill) / 1000;
    this.tokens = Math.min(this.max, this.tokens + elapsed * this.refillRate);
    this.lastRefill = now;

    if (this.tokens >= 1) {
      this.tokens--;
      return true;
    }
    return false;
  }
}

Key Takeaways

  • Measure baseline performance with instrumentation before optimizing
  • Caching with TTL is the highest-impact optimization for most MCP servers
  • Connection pooling for databases and HTTP clients reduces latency significantly
  • Token bucket rate limiting protects both your server and downstream services
  • Paginate large responses to keep tool calls fast and within client limits

Troubleshooting

Cache hit rate is very low despite caching being enabled

Check your cache key strategy. Ensure keys are normalized (e.g., lowercase, trimmed whitespace). If cache entries expire too quickly, increase the TTL. Monitor cache size to ensure it is not being evicted due to size limits.

Database connection pool is exhausted under load

Increase the pool max size. Ensure connections are properly released after use (use try/finally). Check for long-running queries that hold connections. Add connection timeout settings to prevent indefinite waiting.

Memory usage grows continuously over time

Check for unbounded caches or metric arrays that grow without cleanup. Implement cache eviction policies and periodic cleanup. Use heap profiling tools (node --inspect, tracemalloc in Python) to identify leak sources.

Next Steps

  • Set up Prometheus monitoring for your MCP servers
  • Implement Redis caching for multi-instance deployments
  • Add rate limiting based on client authentication tiers
  • Profile your slowest tools and optimize their implementations

Was this helpful?

Share tutorial:

Stay Updated with MCP Insights

Join 5,000+ developers and get weekly insights on MCP development, new server releases, and implementation strategies delivered to your inbox.

We respect your privacy. Unsubscribe at any time.

MCPgee Team

We write in-depth guides, tutorials, and reviews to help developers get the most out of the Model Context Protocol ecosystem.

Frequently Asked Questions

Explore MCP Servers

Browse our directory of 33,000+ MCP servers. Find the perfect tools for your AI-powered workflows.