Implement a unified LLM Gateway supporting multiple API formats and providers: Features: - OpenAI Chat Completions, Responses API, and Anthropic Messages API - Provider adapters for OpenAI, Anthropic, Azure OpenAI, Google Gemini, AWS Bedrock - Model aliasing with weighted round-robin load balancing - Virtual API keys with RPM/TPM rate limiting - Budget control at key and project levels - Request logging, usage statistics, and audit logs - Fallback/retry with circuit breaker pattern - Admin CRUD APIs for providers, projects, keys, models, usage - Provider health checks Tech stack: - FastAPI with async SQLAlchemy 2.0 - SQLite with aiosqlite - bcrypt for API key hashing, AES-256 for provider key encryption - Docker containerization Tests: 18 passing integration tests for admin API endpoints Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
241 lines
8.3 KiB
Python
241 lines
8.3 KiB
Python
"""AWS Bedrock provider adapter."""
|
|
import json
|
|
import time
|
|
import uuid
|
|
from typing import AsyncIterator, Any
|
|
|
|
try:
|
|
import boto3
|
|
from botocore.config import Config
|
|
from botocore.exceptions import ClientError
|
|
HAS_BOTO3 = True
|
|
except ImportError:
|
|
HAS_BOTO3 = False
|
|
|
|
from app.adapters.base import BaseAdapter, HealthStatus, ProviderConfig
|
|
from app.core.fallback import RetryableError, classify_error
|
|
from app.core.transformer import RequestTransformer
|
|
from app.schemas.anthropic import (
|
|
AnthropicMessagesRequest,
|
|
AnthropicMessagesResponse,
|
|
)
|
|
from app.schemas.openai import (
|
|
OpenAIChatCompletionChunk,
|
|
OpenAIChatCompletionRequest,
|
|
OpenAIChatCompletionResponse,
|
|
OpenAIChatCompletionChoice,
|
|
OpenAIChatMessage,
|
|
OpenAIUsage,
|
|
)
|
|
|
|
|
|
class BedrockAdapter(BaseAdapter):
|
|
"""Adapter for AWS Bedrock API."""
|
|
|
|
def __init__(self, config: ProviderConfig):
|
|
super().__init__(config)
|
|
self.transformer = RequestTransformer()
|
|
self._client = None
|
|
|
|
def _get_client(self):
|
|
"""Get or create Bedrock runtime client."""
|
|
if self._client is None and HAS_BOTO3:
|
|
aws_config = self.config.config or {}
|
|
region = aws_config.get("region", "us-east-1")
|
|
|
|
self._client = boto3.client(
|
|
"bedrock-runtime",
|
|
region_name=region,
|
|
config=Config(
|
|
retries={"max_attempts": 3, "mode": "adaptive"},
|
|
connect_timeout=10,
|
|
read_timeout=120,
|
|
),
|
|
)
|
|
return self._client
|
|
|
|
def _openai_to_bedrock_anthropic(
|
|
self, request: OpenAIChatCompletionRequest
|
|
) -> tuple[str, dict[str, Any]]:
|
|
"""Convert OpenAI request to Bedrock Anthropic format."""
|
|
messages = []
|
|
system = None
|
|
|
|
for msg in request.messages:
|
|
if msg.role == "system":
|
|
system = msg.content if isinstance(msg.content, str) else None
|
|
else:
|
|
messages.append({
|
|
"role": msg.role,
|
|
"content": [{"text": msg.content}] if isinstance(msg.content, str) else msg.content,
|
|
})
|
|
|
|
bedrock_request = {
|
|
"messages": messages,
|
|
"max_tokens": request.max_tokens or 4096,
|
|
}
|
|
|
|
if system:
|
|
bedrock_request["system"] = system
|
|
if request.temperature is not None:
|
|
bedrock_request["temperature"] = request.temperature
|
|
if request.top_p is not None:
|
|
bedrock_request["top_p"] = request.top_p
|
|
|
|
# Return model ID and request body
|
|
model_id = request.model
|
|
return model_id, bedrock_request
|
|
|
|
def _bedrock_to_openai(
|
|
self,
|
|
response: dict[str, Any],
|
|
model: str,
|
|
) -> OpenAIChatCompletionResponse:
|
|
"""Convert Bedrock Anthropic response to OpenAI format."""
|
|
content = ""
|
|
output = response.get("output", {})
|
|
message = output.get("message", {})
|
|
|
|
for block in message.get("content", []):
|
|
if "text" in block:
|
|
content += block["text"]
|
|
|
|
stop_reason = response.get("stopReason", "end_turn")
|
|
finish_reason_map = {
|
|
"end_turn": "stop",
|
|
"max_tokens": "length",
|
|
"stop_sequence": "stop",
|
|
"tool_use": "tool_calls",
|
|
}
|
|
finish_reason = finish_reason_map.get(stop_reason, "stop")
|
|
|
|
usage = response.get("usage", {})
|
|
|
|
return OpenAIChatCompletionResponse(
|
|
id=f"bedrock-{uuid.uuid4().hex[:24]}",
|
|
object="chat.completion",
|
|
created=int(time.time()),
|
|
model=model,
|
|
choices=[
|
|
OpenAIChatCompletionChoice(
|
|
index=0,
|
|
message=OpenAIChatMessage(role="assistant", content=content),
|
|
finish_reason=finish_reason,
|
|
)
|
|
],
|
|
usage=OpenAIUsage(
|
|
prompt_tokens=usage.get("inputTokens", 0),
|
|
completion_tokens=usage.get("outputTokens", 0),
|
|
total_tokens=usage.get("inputTokens", 0) + usage.get("outputTokens", 0),
|
|
),
|
|
)
|
|
|
|
async def chat_completions(
|
|
self,
|
|
request: OpenAIChatCompletionRequest,
|
|
) -> OpenAIChatCompletionResponse:
|
|
"""Execute a chat completion request to Bedrock."""
|
|
client = self._get_client()
|
|
model_id, bedrock_request = self._openai_to_bedrock_anthropic(request)
|
|
|
|
try:
|
|
response = client.invoke_model(
|
|
modelId=model_id,
|
|
contentType="application/json",
|
|
accept="application/json",
|
|
body=json.dumps(bedrock_request),
|
|
)
|
|
|
|
response_body = json.loads(response["body"].read())
|
|
return self._bedrock_to_openai(response_body, request.model)
|
|
|
|
except ClientError as e:
|
|
error = classify_error(Exception(str(e)))
|
|
raise error
|
|
|
|
async def stream_chat_completions(
|
|
self,
|
|
request: OpenAIChatCompletionRequest,
|
|
) -> AsyncIterator[OpenAIChatCompletionChunk]:
|
|
"""Execute a streaming chat completion request to Bedrock."""
|
|
client = self._get_client()
|
|
model_id, bedrock_request = self._openai_to_bedrock_anthropic(request)
|
|
|
|
try:
|
|
response = client.invoke_model_with_response_stream(
|
|
modelId=model_id,
|
|
contentType="application/json",
|
|
accept="application/json",
|
|
body=json.dumps(bedrock_request),
|
|
)
|
|
|
|
chunk_id = f"bedrock-{uuid.uuid4().hex[:24]}"
|
|
|
|
for event in response["body"]:
|
|
chunk_data = json.loads(event["chunk"]["bytes"])
|
|
|
|
if chunk_data.get("type") == "content_block_delta":
|
|
delta = chunk_data.get("delta", {})
|
|
text = delta.get("text", "")
|
|
|
|
if text:
|
|
chunk = OpenAIChatCompletionChunk(
|
|
id=chunk_id,
|
|
object="chat.completion.chunk",
|
|
created=int(time.time()),
|
|
model=request.model,
|
|
choices=[
|
|
{
|
|
"index": 0,
|
|
"delta": {"content": text},
|
|
"finish_reason": None,
|
|
}
|
|
],
|
|
)
|
|
yield chunk
|
|
|
|
elif chunk_data.get("type") == "message_delta":
|
|
stop_reason = chunk_data.get("delta", {}).get("stop_reason")
|
|
if stop_reason:
|
|
finish_reason = "stop" if stop_reason == "end_turn" else "length"
|
|
chunk = OpenAIChatCompletionChunk(
|
|
id=chunk_id,
|
|
object="chat.completion.chunk",
|
|
created=int(time.time()),
|
|
model=request.model,
|
|
choices=[
|
|
{
|
|
"index": 0,
|
|
"delta": {},
|
|
"finish_reason": finish_reason,
|
|
}
|
|
],
|
|
)
|
|
yield chunk
|
|
|
|
except ClientError as e:
|
|
error = classify_error(Exception(str(e)))
|
|
raise error
|
|
|
|
async def messages(
|
|
self,
|
|
request: AnthropicMessagesRequest,
|
|
) -> AnthropicMessagesResponse:
|
|
"""Execute an Anthropic Messages API request via Bedrock."""
|
|
openai_request = self.transformer.anthropic_to_openai(request)
|
|
openai_response = await self.chat_completions(openai_request)
|
|
return self.transformer.openai_response_to_anthropic(openai_response)
|
|
|
|
async def check_health(self) -> HealthStatus:
|
|
"""Check Bedrock API health."""
|
|
if not HAS_BOTO3:
|
|
return HealthStatus.UNHEALTHY
|
|
|
|
client = self._get_client()
|
|
try:
|
|
# List available models to check health
|
|
client.list_foundation_models()
|
|
return HealthStatus.HEALTHY
|
|
except Exception:
|
|
return HealthStatus.UNHEALTHY
|