30 min read
Advanced
Security

MCP Authentication Implementation

Implement authentication for MCP servers using OAuth 2.0, JWT tokens, API keys, and role-based access control

MCPgee Team

MCP Expert

A working MCP server with Streamable HTTP transportUnderstanding of OAuth 2.0 and JWT conceptsFamiliarity with MCP security fundamentalsNode.js or Python development experience

MCP Authentication Implementation

Introduction

Authentication is essential for any MCP server exposed over the network. Without it, anyone who discovers your server endpoint can invoke tools, access resources, and potentially compromise your systems. This tutorial covers implementing authentication for MCP servers using multiple strategies, from simple API keys to full OAuth 2.0 flows.

For foundational security practices, read our security fundamentals tutorial first.

Authentication Strategies Overview

StrategyComplexityBest For
API KeysLowInternal tools, development
JWT TokensMediumService-to-service, custom auth
OAuth 2.0HighThird-party access, user delegation
mTLSHighHigh-security environments

API Key Authentication

The simplest approach for internal or development MCP servers.

TypeScript Implementation

typescript
import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js';
import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js';
import express from 'express';
import crypto from 'crypto';
import { z } from 'zod';

const app = express();
app.use(express.json());

// Store API keys securely (in production, use a database or secrets manager)
const API_KEYS = new Set(process.env.MCP_API_KEYS?.split(',') || []);

function apiKeyAuth(req: express.Request, res: express.Response, next: express.NextFunction) {
  const apiKey = req.headers['x-api-key'] as string;

  if (!apiKey) {
    return res.status(401).json({ error: 'Missing X-API-Key header' });
  }

  // Use timing-safe comparison to prevent timing attacks
  const isValid = Array.from(API_KEYS).some(
    (key) => crypto.timingSafeEqual(Buffer.from(apiKey), Buffer.from(key))
  );

  if (!isValid) {
    return res.status(403).json({ error: 'Invalid API key' });
  }

  next();
}

const server = new McpServer({ name: 'authenticated-server', version: '1.0.0' });

server.tool('hello', 'Greet someone', { name: z.string() }, async ({ name }) => ({
  content: [{ type: 'text', text: `Hello, ${name}!` }],
}));

const transport = new StreamableHTTPServerTransport({
  sessionIdGenerator: () => crypto.randomUUID(),
});

app.use('/mcp', apiKeyAuth);
app.post('/mcp', (req, res) => transport.handleRequest(req, res));
app.get('/mcp', (req, res) => transport.handleRequest(req, res));
app.delete('/mcp', (req, res) => transport.handleRequest(req, res));

await server.connect(transport);
app.listen(3000);

Python Implementation

python
import os
import hmac
from mcp.server.fastmcp import FastMCP
from starlette.applications import Starlette
from starlette.middleware import Middleware
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import JSONResponse

API_KEYS = set(os.environ.get("MCP_API_KEYS", "").split(","))

class APIKeyMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request, call_next):
        api_key = request.headers.get("x-api-key", "")
        if not any(hmac.compare_digest(api_key, key) for key in API_KEYS):
            return JSONResponse({"error": "Invalid API key"}, status_code=403)
        return await call_next(request)

mcp = FastMCP("secure-server")

@mcp.tool()
def hello(name: str) -> str:
    """Greet someone.

    Args:
        name: Name to greet
    """
    return f"Hello, {name}!"

JWT Token Authentication

JWT (JSON Web Tokens) provide stateless authentication with embedded claims.

Generating and Verifying Tokens

typescript
import jwt from 'jsonwebtoken';
import express from 'express';

const JWT_SECRET = process.env.JWT_SECRET!;
const JWT_ISSUER = 'mcp-auth-server';

// Token generation endpoint
app.post('/auth/token', (req, res) => {
  const { clientId, clientSecret } = req.body;

  // Validate client credentials against your database
  const client = validateClient(clientId, clientSecret);
  if (!client) {
    return res.status(401).json({ error: 'Invalid credentials' });
  }

  const token = jwt.sign(
    {
      sub: clientId,
      role: client.role,
      permissions: client.permissions,
    },
    JWT_SECRET,
    {
      issuer: JWT_ISSUER,
      expiresIn: '1h',
      algorithm: 'HS256',
    }
  );

  res.json({
    access_token: token,
    token_type: 'Bearer',
    expires_in: 3600,
  });
});

// JWT verification middleware
function jwtAuth(req: express.Request, res: express.Response, next: express.NextFunction) {
  const authHeader = req.headers.authorization;

  if (!authHeader || !authHeader.startsWith('Bearer ')) {
    return res.status(401).json({ error: 'Missing Bearer token' });
  }

  const token = authHeader.slice(7);

  try {
    const decoded = jwt.verify(token, JWT_SECRET, {
      issuer: JWT_ISSUER,
      algorithms: ['HS256'],
    });
    (req as any).auth = decoded;
    next();
  } catch (error) {
    if (error instanceof jwt.TokenExpiredError) {
      return res.status(401).json({ error: 'Token expired' });
    }
    return res.status(401).json({ error: 'Invalid token' });
  }
}

app.use('/mcp', jwtAuth);

Python JWT Verification

python
import jwt
from datetime import datetime, timedelta

JWT_SECRET = os.environ["JWT_SECRET"]

def create_token(client_id: str, role: str, permissions: list[str]) -> str:
    payload = {
        "sub": client_id,
        "role": role,
        "permissions": permissions,
        "iat": datetime.utcnow(),
        "exp": datetime.utcnow() + timedelta(hours=1),
        "iss": "mcp-auth-server",
    }
    return jwt.encode(payload, JWT_SECRET, algorithm="HS256")

def verify_token(token: str) -> dict:
    try:
        return jwt.decode(
            token,
            JWT_SECRET,
            algorithms=["HS256"],
            issuer="mcp-auth-server",
        )
    except jwt.ExpiredSignatureError:
        raise ValueError("Token expired")
    except jwt.InvalidTokenError:
        raise ValueError("Invalid token")

OAuth 2.0 Authentication

OAuth 2.0 is the recommended approach for production MCP servers that support third-party access.

Client Credentials Flow

For service-to-service authentication:

typescript
import express from 'express';
import crypto from 'crypto';

const app = express();
app.use(express.json());

// OAuth client registry (use a database in production)
const clients = new Map([
  ['client-123', {
    secret: 'hashed-secret-here',
    scopes: ['tools:read', 'tools:execute'],
    name: 'AI Assistant Client',
  }],
]);

// Token endpoint
app.post('/oauth/token', async (req, res) => {
  const { grant_type, client_id, client_secret, scope } = req.body;

  if (grant_type !== 'client_credentials') {
    return res.status(400).json({ error: 'unsupported_grant_type' });
  }

  const client = clients.get(client_id);
  if (!client || !verifySecret(client_secret, client.secret)) {
    return res.status(401).json({ error: 'invalid_client' });
  }

  // Validate requested scopes
  const requestedScopes = (scope || '').split(' ');
  const grantedScopes = requestedScopes.filter((s: string) => client.scopes.includes(s));

  const accessToken = generateAccessToken({
    sub: client_id,
    scopes: grantedScopes,
  });

  res.json({
    access_token: accessToken,
    token_type: 'Bearer',
    expires_in: 3600,
    scope: grantedScopes.join(' '),
  });
});

// Protected MCP endpoint
app.use('/mcp', (req, res, next) => {
  const token = req.headers.authorization?.replace('Bearer ', '');
  if (!token) {
    return res.status(401).json({ error: 'Missing token' });
  }

  const payload = verifyAccessToken(token);
  if (!payload) {
    return res.status(401).json({ error: 'Invalid token' });
  }

  (req as any).auth = payload;
  next();
});

Authorization Code Flow

For user-delegated access (browser-based):

typescript
// Authorization endpoint - renders consent screen
app.get('/oauth/authorize', (req, res) => {
  const { client_id, redirect_uri, response_type, scope, state } = req.query;

  if (response_type !== 'code') {
    return res.status(400).json({ error: 'unsupported_response_type' });
  }

  // Render authorization consent page
  res.render('authorize', { client_id, scope, state, redirect_uri });
});

// Handle user consent
app.post('/oauth/authorize', (req, res) => {
  const { client_id, redirect_uri, scope, state, approved } = req.body;

  if (!approved) {
    return res.redirect(`${redirect_uri}?error=access_denied&state=${state}`);
  }

  const code = generateAuthorizationCode(client_id, scope);

  res.redirect(`${redirect_uri}?code=${code}&state=${state}`);
});

// Token exchange endpoint
app.post('/oauth/token', (req, res) => {
  const { grant_type, code, redirect_uri, client_id, client_secret } = req.body;

  if (grant_type === 'authorization_code') {
    const authData = validateAuthorizationCode(code, client_id, redirect_uri);
    if (!authData) {
      return res.status(400).json({ error: 'invalid_grant' });
    }

    const accessToken = generateAccessToken({ sub: authData.userId, scopes: authData.scopes });
    const refreshToken = generateRefreshToken(authData.userId);

    res.json({
      access_token: accessToken,
      refresh_token: refreshToken,
      token_type: 'Bearer',
      expires_in: 3600,
    });
  }
});

Role-Based Access Control (RBAC)

Restrict tool access based on user roles:

typescript
import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js';
import { z } from 'zod';

type Role = 'viewer' | 'editor' | 'admin';

interface ToolPermission {
  roles: Role[];
  rateLimit: number;
}

const toolPermissions: Record<string, ToolPermission> = {
  'read-data': { roles: ['viewer', 'editor', 'admin'], rateLimit: 100 },
  'write-data': { roles: ['editor', 'admin'], rateLimit: 50 },
  'delete-data': { roles: ['admin'], rateLimit: 10 },
  'manage-users': { roles: ['admin'], rateLimit: 20 },
};

function checkPermission(toolName: string, userRole: Role): boolean {
  const permission = toolPermissions[toolName];
  if (!permission) return false;
  return permission.roles.includes(userRole);
}

// Middleware that checks permissions before tool execution
app.use('/mcp', (req, res, next) => {
  const auth = (req as any).auth;
  if (!auth) {
    return res.status(401).json({ error: 'Not authenticated' });
  }

  // Store auth context for tool handlers
  (req as any).userRole = auth.role;
  next();
});

Python RBAC

python
from enum import Enum
from functools import wraps

class Role(Enum):
    VIEWER = "viewer"
    EDITOR = "editor"
    ADMIN = "admin"

ROLE_HIERARCHY = {
    Role.ADMIN: {Role.VIEWER, Role.EDITOR, Role.ADMIN},
    Role.EDITOR: {Role.VIEWER, Role.EDITOR},
    Role.VIEWER: {Role.VIEWER},
}

def requires_role(minimum_role: Role):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            current_role = get_current_user_role()
            if minimum_role not in ROLE_HIERARCHY.get(current_role, set()):
                return json.dumps({"error": "Insufficient permissions"})
            return func(*args, **kwargs)
        return wrapper
    return decorator

@mcp.tool()
@requires_role(Role.VIEWER)
def read_data(key: str) -> str:
    """Read data (requires viewer role).

    Args:
        key: Data key to read
    """
    return json.dumps({"key": key, "value": data_store.get(key)})

@mcp.tool()
@requires_role(Role.ADMIN)
def delete_data(key: str) -> str:
    """Delete data (requires admin role).

    Args:
        key: Data key to delete
    """
    data_store.pop(key, None)
    return json.dumps({"status": "deleted"})

Token Refresh and Session Management

Refresh Token Flow

typescript
app.post('/oauth/token', async (req, res) => {
  if (req.body.grant_type === 'refresh_token') {
    const { refresh_token } = req.body;

    const tokenData = validateRefreshToken(refresh_token);
    if (!tokenData) {
      return res.status(400).json({ error: 'invalid_grant' });
    }

    // Revoke old refresh token (rotation)
    revokeRefreshToken(refresh_token);

    // Issue new tokens
    const newAccessToken = generateAccessToken({ sub: tokenData.userId });
    const newRefreshToken = generateRefreshToken(tokenData.userId);

    res.json({
      access_token: newAccessToken,
      refresh_token: newRefreshToken,
      token_type: 'Bearer',
      expires_in: 3600,
    });
  }
});

MCP Client Authentication Configuration

Claude Desktop with Authenticated Server

json
{
  "mcpServers": {
    "authenticated-server": {
      "url": "https://mcp.example.com/mcp",
      "headers": {
        "Authorization": "Bearer your-token-here"
      }
    }
  }
}

VS Code Configuration

json
{
  "servers": {
    "secure-server": {
      "url": "https://mcp.example.com/mcp",
      "headers": {
        "X-API-Key": "your-api-key"
      }
    }
  }
}

For more on client configuration, see our Claude integration and VS Code tutorials.

Security Best Practices

  1. Never hardcode secrets in source code. Use environment variables or secrets managers.
  2. Use HTTPS for all authenticated MCP endpoints.
  3. Rotate secrets regularly and implement token expiration.
  4. Log authentication events for monitoring and incident response.
  5. Implement account lockout after repeated failed attempts.
  6. Use timing-safe comparisons for secret validation.

For comprehensive security guidance, review our MCP server security guide.

Conclusion

Authentication protects your MCP servers from unauthorized access. Start with API keys for development and internal tools, then upgrade to JWT or OAuth 2.0 for production deployments. Combine authentication with RBAC and audit logging for defense in depth. Remember that authentication is just one layer of security, and should always be paired with the practices covered in our security fundamentals tutorial.

Code Examples

API Key Authentication (TypeScript)typescript
import express from 'express';
import crypto from 'crypto';

const API_KEYS = new Set(process.env.MCP_API_KEYS?.split(',') || []);

function apiKeyAuth(req: express.Request, res: express.Response, next: express.NextFunction) {
  const key = req.headers['x-api-key'] as string;
  if (!key) return res.status(401).json({ error: 'Missing API key' });

  const valid = Array.from(API_KEYS).some(
    (k) => crypto.timingSafeEqual(Buffer.from(key), Buffer.from(k))
  );
  if (!valid) return res.status(403).json({ error: 'Invalid API key' });
  next();
}

app.use('/mcp', apiKeyAuth);
JWT Verification (Python)python
import jwt
import os
from datetime import datetime, timedelta

JWT_SECRET = os.environ["JWT_SECRET"]

def create_token(client_id: str, role: str) -> str:
    return jwt.encode(
        {"sub": client_id, "role": role, "exp": datetime.utcnow() + timedelta(hours=1)},
        JWT_SECRET, algorithm="HS256",
    )

def verify_token(token: str) -> dict:
    return jwt.decode(token, JWT_SECRET, algorithms=["HS256"])
Role-Based Access Controltypescript
type Role = 'viewer' | 'editor' | 'admin';

const permissions: Record<string, Role[]> = {
  'read-data': ['viewer', 'editor', 'admin'],
  'write-data': ['editor', 'admin'],
  'delete-data': ['admin'],
};

function checkPermission(tool: string, role: Role): boolean {
  return permissions[tool]?.includes(role) ?? false;
}

Key Takeaways

  • API keys are simplest for internal tools; use JWT or OAuth 2.0 for production
  • Always use timing-safe comparisons when validating secrets to prevent timing attacks
  • Role-based access control restricts which tools different clients can invoke
  • Token refresh with rotation prevents token reuse after compromise
  • Combine authentication with rate limiting and audit logging for defense in depth

Troubleshooting

JWT tokens are rejected even though they appear valid

Check that the issuer, audience, and algorithm match between token generation and verification. Verify that server clocks are synchronized, as clock skew can cause premature expiration. Enable JWT library debug logging to see the exact validation failure.

API key comparison sometimes fails for valid keys

Ensure you are comparing the same encoding (UTF-8). Check for trailing whitespace or newlines in environment variables. Use Buffer.from with explicit encoding: Buffer.from(key, "utf-8").

OAuth refresh tokens stop working after server restart

Store refresh tokens in a persistent database, not in-memory. In-memory token stores lose all tokens on restart. Use Redis or a database table to persist refresh token state.

Next Steps

  • Implement the security fundamentals checklist for your server
  • Set up OAuth 2.0 with your identity provider
  • Add audit logging for all authenticated requests
  • Deploy your authenticated server with Docker or Kubernetes

Was this helpful?

Share tutorial:

Stay Updated with MCP Insights

Join 5,000+ developers and get weekly insights on MCP development, new server releases, and implementation strategies delivered to your inbox.

We respect your privacy. Unsubscribe at any time.

MCPgee Team

We write in-depth guides, tutorials, and reviews to help developers get the most out of the Model Context Protocol ecosystem.

Frequently Asked Questions

Explore MCP Servers

Browse our directory of 33,000+ MCP servers. Find the perfect tools for your AI-powered workflows.