root 315326d0a2 feat(middleware): add auth, logging, and audit middleware
- Add authentication middleware with API key validation
- Add request logging middleware for observability
- Add audit logging middleware for admin operations
- Refactor API endpoints to use centralized auth middleware
- Add comprehensive unit tests for all middleware
- Add API documentation and deployment guide
- Update README with health endpoints and documentation links
- Fix test data isolation in router tests

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-03 03:23:37 +08:00

78 lines
2.2 KiB
Python

"""Authentication middleware for API requests."""
from datetime import datetime
from typing import Annotated
from fastapi import Depends, Header, HTTPException
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.db.database import get_db
from app.models.api_key import APIKey
from app.utils.crypto import verify_api_key
class AuthError(HTTPException):
"""Authentication error."""
def __init__(self, status_code: int, message: str, error_type: str):
super().__init__(
status_code=status_code,
detail={
"error": {
"type": error_type,
"message": message,
}
},
)
async def authenticate_request(
authorization: str | None = Header(None),
x_api_key: str | None = Header(None),
db: AsyncSession = Depends(get_db),
) -> APIKey:
"""
Authenticate request using virtual API key.
Supports both Authorization: Bearer <key> and X-API-Key: <key> formats.
Args:
authorization: Authorization header value.
x_api_key: X-API-Key header value.
db: Database session.
Returns:
The authenticated APIKey object.
Raises:
AuthError: If authentication fails.
"""
# Extract key from header
key = None
if authorization:
if authorization.startswith("Bearer "):
key = authorization[7:]
elif x_api_key:
key = x_api_key
if not key:
raise AuthError(401, "Missing API key", "authentication_error")
# Find and verify key
result = await db.execute(select(APIKey))
api_keys = result.scalars().all()
for api_key in api_keys:
if verify_api_key(key, api_key.key_hash):
if not api_key.enabled:
raise AuthError(403, "API key is disabled", "permission_error")
if api_key.expires_at and api_key.expires_at < datetime.utcnow():
raise AuthError(403, "API key has expired", "permission_error")
return api_key
raise AuthError(401, "Invalid API key", "authentication_error")
# Type alias for dependency injection
AuthenticatedAPIKey = Annotated[APIKey, Depends(authenticate_request)]